Repository: sqoop Updated Branches: refs/heads/sqoop2 0773c10f8 -> bc0de7c19
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java index 403f213..3acd4a1 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java @@ -66,6 +66,7 @@ public class MapreduceExecutionEngine extends ExecutionEngine { From from = (From) mrJobRequest.getFrom(); To to = (To) mrJobRequest.getTo(); MutableMapContext context = mrJobRequest.getDriverContext(); + context.setString(MRJobConstants.SUBMITTING_USER, jobRequest.getJobSubmission().getCreationUser()); context.setString(MRJobConstants.JOB_ETL_PARTITIONER, from.getPartitioner().getName()); context.setString(MRJobConstants.JOB_ETL_PARTITION, from.getPartition().getName()); context.setString(MRJobConstants.JOB_ETL_EXTRACTOR, from.getExtractor().getName()); http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java index df767e6..737ceda 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java @@ -56,6 +56,10 @@ public final class MRJobConstants extends Constants { public static final String JOB_ETL_EXTRACTOR_NUM = PREFIX_JOB_CONFIG + "etl.extractor.count"; + public static final String SUBMITTING_USER = PREFIX_JOB_CONFIG + + "submission.user"; + + public static final String PREFIX_CONNECTOR_FROM_CONTEXT = PREFIX_JOB_CONFIG + "connector.from.context."; http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java index b3c1ce8..2a97878 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java @@ -44,7 +44,7 @@ public class SqoopDestroyerExecutor { * and configuration objects. * @param direction The direction of the Destroyer to execute. */ - public static void executeDestroyer(boolean success, Configuration configuration, Direction direction) { + public static void executeDestroyer(boolean success, Configuration configuration, Direction direction, String user) { String destroyerPropertyName, prefixPropertyName; switch (direction) { default: @@ -78,7 +78,7 @@ public class SqoopDestroyerExecutor { Schema schema = direction == Direction.FROM ? matcher.getFromSchema() : matcher.getToSchema(); - DestroyerContext destroyerContext = new DestroyerContext(subContext, success, schema); + DestroyerContext destroyerContext = new DestroyerContext(subContext, success, schema, user); LOG.info("Executing destroyer class " + destroyer.getClass()); destroyer.destroy(destroyerContext, configConnection, configJob); http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java index 67189a1..0623f7b 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java @@ -69,7 +69,7 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> { Schema fromSchema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf); long maxPartitions = conf.getLong(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, 10); - PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions, fromSchema); + PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions, fromSchema, conf.get(MRJobConstants.SUBMITTING_USER)); List<Partition> partitions = partitioner.getPartitions(partitionerContext, connectorLinkConfig, connectorFromJobConfig); List<InputSplit> splits = new LinkedList<InputSplit>(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index c93813b..7d20992 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.MRConstants; import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Logger; import org.apache.sqoop.common.Direction; @@ -85,7 +86,7 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, Object fromJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); SqoopSplit split = context.getCurrentKey(); - ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context), matcher.getFromSchema()); + ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context), matcher.getFromSchema(), conf.get(MRJobConstants.SUBMITTING_USER)); try { LOG.info("Starting progress service"); http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java index 88ab98e..8c8526b 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java @@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; import org.apache.sqoop.common.Direction; +import org.apache.sqoop.job.MRJobConstants; import org.apache.sqoop.job.io.SqoopWritable; /** @@ -74,8 +75,8 @@ public class SqoopNullOutputFormat extends OutputFormat<SqoopWritable, NullWrita private void invokeDestroyerExecutor(JobContext jobContext, boolean success) { Configuration config = jobContext.getConfiguration(); - SqoopDestroyerExecutor.executeDestroyer(success, config, Direction.FROM); - SqoopDestroyerExecutor.executeDestroyer(success, config, Direction.TO); + SqoopDestroyerExecutor.executeDestroyer(success, config, Direction.FROM, jobContext.getConfiguration().get(MRJobConstants.SUBMITTING_USER)); + SqoopDestroyerExecutor.executeDestroyer(success, config, Direction.TO, jobContext.getConfiguration().get(MRJobConstants.SUBMITTING_USER)); } @Override http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index d94b658..623d1f4 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -261,7 +261,7 @@ public class SqoopOutputFormatLoadExecutor { // encapsulates the toDataFormat // Create loader context - LoaderContext loaderContext = new LoaderContext(subContext, reader, matcher.getToSchema()); + LoaderContext loaderContext = new LoaderContext(subContext, reader, matcher.getToSchema(), context.getConfiguration().get(MRJobConstants.SUBMITTING_USER)); LOG.info("Running loader class " + loaderName); loader.load(loaderContext, connectorLinkConfig, connectorToJobConfig); http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java index 3208e8a..3dee8f6 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java @@ -215,7 +215,7 @@ public class TestSqoopOutputFormatLoadExecutor { exceptionThrown = true; } writer.close(null); - verify(jobContextMock, times(1)).getConfiguration(); + verify(jobContextMock, times(2)).getConfiguration(); verify(jobContextMock, times(1)).getCounter(SqoopCounters.ROWS_WRITTEN); Assert.assertFalse(exceptionThrown, "Exception Thrown during writing"); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/server/src/main/java/org/apache/sqoop/filter/SqoopAuthenticationFilter.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/sqoop/filter/SqoopAuthenticationFilter.java b/server/src/main/java/org/apache/sqoop/filter/SqoopAuthenticationFilter.java index ddca9d4..19e2b29 100644 --- a/server/src/main/java/org/apache/sqoop/filter/SqoopAuthenticationFilter.java +++ b/server/src/main/java/org/apache/sqoop/filter/SqoopAuthenticationFilter.java @@ -94,7 +94,7 @@ public class SqoopAuthenticationFilter extends DelegationTokenAuthenticationFilt Map<String, String> proxyuserConf = mapContext.getValByRegex("org\\.apache\\.sqoop\\.authentication\\.proxyuser"); Configuration conf = new Configuration(false); for (Map.Entry<String, String> entry : proxyuserConf.entrySet()) { - conf.set(entry.getKey().substring("org.apache.sqoop.authentication.proxyuser.".length()), entry.getValue()); + conf.set(entry.getKey().substring("org.apache.sqoop.authentication.".length()), entry.getValue()); } return conf; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/test/src/main/java/org/apache/sqoop/test/minicluster/SqoopMiniCluster.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/sqoop/test/minicluster/SqoopMiniCluster.java b/test/src/main/java/org/apache/sqoop/test/minicluster/SqoopMiniCluster.java index 8a0faf8..0f04bdf 100644 --- a/test/src/main/java/org/apache/sqoop/test/minicluster/SqoopMiniCluster.java +++ b/test/src/main/java/org/apache/sqoop/test/minicluster/SqoopMiniCluster.java @@ -154,7 +154,6 @@ public abstract class SqoopMiniCluster { output.add(entry.getKey() + "=" + entry.getValue()); } } - /** * Return properties for logger configuration. * @@ -201,6 +200,18 @@ public abstract class SqoopMiniCluster { properties.put("org.apache.sqoop.authentication.type", "SIMPLE"); properties.put("org.apache.sqoop.authentication.handler", "org.apache.sqoop.security.SimpleAuthenticationHandler"); + /** + * Due to the fact that we share a JVM with hadoop during unit testing, + * proxy user configuration is also shared with hadoop. + * + * We need to enable impersonation on hadoop for our map reduce jobs + * (normally this would be accomplished with "hadoop.proxyuser"), so we + * pass it through sqoop configuration + */ + String user = System.getProperty("user.name"); + properties.put("org.apache.sqoop.authentication.proxyuser." + user + ".groups", "*"); + properties.put("org.apache.sqoop.authentication.proxyuser." + user + ".hosts", "*"); + return properties; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java index 9b77d98..d712e46 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java @@ -160,7 +160,8 @@ public class OutputDirectoryTest extends ConnectorTestCase { // We can directly verify the ErrorCode from SqoopException as client side // is not rebuilding SqoopExceptions per missing ErrorCodes. E.g. the cause // will be generic Throwable and not SqoopException instance. - Throwable cause = ex.getCause(); + // We need to 'getCause' twice because of the layer from impersonation + Throwable cause = ex.getCause().getCause(); assertNotNull(cause); for(String fragment : fragments) {
