SQOOP-1690: Implement doAs for Sqoop2 (Abraham Fine via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/bc0de7c1 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/bc0de7c1 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/bc0de7c1 Branch: refs/heads/sqoop2 Commit: bc0de7c199a74bdc0b804696159c31e13bdd5c3b Parents: 0773c10 Author: Jarek Jarcec Cecho <[email protected]> Authored: Tue Oct 27 09:26:59 2015 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Tue Oct 27 09:26:59 2015 -0700 ---------------------------------------------------------------------- .../apache/sqoop/job/etl/DestroyerContext.java | 14 +- .../apache/sqoop/job/etl/ExtractorContext.java | 13 +- .../sqoop/job/etl/InitializerContext.java | 14 +- .../org/apache/sqoop/job/etl/LoaderContext.java | 15 +- .../sqoop/job/etl/PartitionerContext.java | 14 +- .../sqoop/connector/ftp/TestFtpLoader.java | 2 +- .../sqoop/connector/jdbc/TestExtractor.java | 8 +- .../connector/jdbc/TestFromInitializer.java | 26 +-- .../apache/sqoop/connector/jdbc/TestLoader.java | 2 +- .../sqoop/connector/jdbc/TestPartitioner.java | 32 ++-- .../sqoop/connector/jdbc/TestToInitializer.java | 18 +- .../sqoop/connector/hdfs/HdfsExtractor.java | 36 ++-- .../connector/hdfs/HdfsFromInitializer.java | 56 +++--- .../apache/sqoop/connector/hdfs/HdfsLoader.java | 69 ++++---- .../sqoop/connector/hdfs/HdfsPartitioner.java | 170 ++++++++++--------- .../sqoop/connector/hdfs/HdfsToDestroyer.java | 44 +++-- .../sqoop/connector/hdfs/HdfsToInitializer.java | 39 +++-- .../sqoop/connector/hdfs/TestExtractor.java | 7 +- .../sqoop/connector/hdfs/TestFromDestroyer.java | 4 +- .../connector/hdfs/TestFromInitializer.java | 2 +- .../sqoop/connector/hdfs/TestHdfsBase.java | 11 ++ .../apache/sqoop/connector/hdfs/TestLoader.java | 10 +- .../sqoop/connector/hdfs/TestPartitioner.java | 2 +- .../sqoop/connector/hdfs/TestToDestroyer.java | 4 +- .../sqoop/connector/hdfs/TestToInitializer.java | 8 +- .../sqoop/connector/kafka/TestKafkaLoader.java | 2 +- .../sqoop/connector/kite/TestKiteExtractor.java | 2 +- .../sqoop/connector/kite/TestKiteLoader.java | 2 +- .../connector/kite/TestKiteToDestroyer.java | 7 +- .../org/apache/sqoop/driver/JobManager.java | 15 +- .../mapreduce/MapreduceExecutionEngine.java | 1 + .../org/apache/sqoop/job/MRJobConstants.java | 4 + .../sqoop/job/mr/SqoopDestroyerExecutor.java | 4 +- .../apache/sqoop/job/mr/SqoopInputFormat.java | 2 +- .../org/apache/sqoop/job/mr/SqoopMapper.java | 3 +- .../sqoop/job/mr/SqoopNullOutputFormat.java | 5 +- .../job/mr/SqoopOutputFormatLoadExecutor.java | 2 +- .../mr/TestSqoopOutputFormatLoadExecutor.java | 2 +- .../sqoop/filter/SqoopAuthenticationFilter.java | 2 +- .../test/minicluster/SqoopMiniCluster.java | 13 +- .../connector/hdfs/OutputDirectoryTest.java | 3 +- 41 files changed, 423 insertions(+), 266 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java b/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java index f4f6d1d..38d94db 100644 --- a/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java +++ b/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java @@ -35,10 +35,13 @@ public class DestroyerContext extends TransferableContext { private Schema schema; - public DestroyerContext(ImmutableContext context, boolean success, Schema schema) { + private String user; + + public DestroyerContext(ImmutableContext context, boolean success, Schema schema, String user) { super(context); this.success = success; this.schema = schema; + this.user = user; } /** @@ -58,4 +61,13 @@ public class DestroyerContext extends TransferableContext { public Schema getSchema() { return schema; } + + /** + * Return user associated with this step. + * + * @return + */ + public String getUser() { + return user; + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java index 43fcaa2..748bdfb 100644 --- a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java +++ b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java @@ -36,10 +36,13 @@ public class ExtractorContext extends TransferableContext { private final Schema schema; - public ExtractorContext(ImmutableContext context, DataWriter writer, Schema schema) { + private final String user; + + public ExtractorContext(ImmutableContext context, DataWriter writer, Schema schema, String user) { super(context); this.writer = writer; this.schema = schema; + this.user = user; } /** @@ -58,5 +61,13 @@ public class ExtractorContext extends TransferableContext { public Schema getSchema() { return schema; } + /** + * Return the user + * + * @return + */ + public String getUser() { + return user; + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/common/src/main/java/org/apache/sqoop/job/etl/InitializerContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/job/etl/InitializerContext.java b/common/src/main/java/org/apache/sqoop/job/etl/InitializerContext.java index 469132b..7ad0d70 100644 --- a/common/src/main/java/org/apache/sqoop/job/etl/InitializerContext.java +++ b/common/src/main/java/org/apache/sqoop/job/etl/InitializerContext.java @@ -31,8 +31,11 @@ import org.apache.sqoop.common.MutableContext; @InterfaceStability.Unstable public class InitializerContext extends TransferableContext { - public InitializerContext(MutableContext context) { + private String user; + + public InitializerContext(MutableContext context, String user) { super(context); + this.user = user; } /** @@ -47,4 +50,13 @@ public class InitializerContext extends TransferableContext { public MutableContext getContext() { return (MutableContext)super.getContext(); } + + /** + * Return the user + * + * @return + */ + public String getUser() { + return user; + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java b/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java index f9ea9ad..f0f2e38 100644 --- a/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java +++ b/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java @@ -36,10 +36,13 @@ public class LoaderContext extends TransferableContext { private final Schema schema; - public LoaderContext(ImmutableContext context, DataReader reader, Schema schema) { + private final String user; + + public LoaderContext(ImmutableContext context, DataReader reader, Schema schema, String user) { super(context); this.reader = reader; this.schema = schema; + this.user = user; } /** @@ -59,4 +62,14 @@ public class LoaderContext extends TransferableContext { public Schema getSchema() { return schema; } + + /** + * Return the String representing the user. + * + * @return + */ + public String getUser() { + return user; + } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java b/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java index bb52bb2..b39497b 100644 --- a/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java +++ b/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java @@ -37,10 +37,13 @@ public class PartitionerContext extends TransferableContext { private boolean skipMaxPartitionCheck = false; - public PartitionerContext(ImmutableContext context, long maxPartitions, Schema schema) { + private String user; + + public PartitionerContext(ImmutableContext context, long maxPartitions, Schema schema, String user) { super(context); this.maxPartitions = maxPartitions; this.schema = schema; + this.user = user; } /** @@ -89,4 +92,13 @@ public class PartitionerContext extends TransferableContext { public Schema getSchema() { return schema; } + + /** + * Return user that submitted job + * + * @return + */ + public String getUser() { + return user; + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/TestFtpLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/TestFtpLoader.java b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/TestFtpLoader.java index 33c808a..e1255ff 100644 --- a/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/TestFtpLoader.java +++ b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/TestFtpLoader.java @@ -75,7 +75,7 @@ public class TestFtpLoader { }; try { - LoaderContext context = new LoaderContext(null, reader, null); + LoaderContext context = new LoaderContext(null, reader, null, "test_user"); LinkConfiguration linkConfig = new LinkConfiguration(); linkConfig.linkConfig.username = username; linkConfig.linkConfig.password = password; http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java index 264cadf..3b52128 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java @@ -104,7 +104,7 @@ public class TestExtractor { // result set schema.addColumn(new FixedPoint("c1",2L, true)).addColumn(new Decimal("c2", 5, 2)).addColumn(new Text("c3")).addColumn(new Date("c4")); - ExtractorContext extractorContext = new ExtractorContext(context, writer, schema); + ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user"); partition = new GenericJdbcPartition(); partition.setConditions("-50.0 <= DCOL AND DCOL < -16.6666666666666665"); @@ -144,7 +144,7 @@ public class TestExtractor { // result set schema.addColumn(new FixedPoint("c1", 2L, true)).addColumn(new Text("c2")).addColumn(new Date("c3")); - ExtractorContext extractorContext = new ExtractorContext(context, writer, schema); + ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user"); partition = new GenericJdbcPartition(); partition.setConditions("-50 <= ICOL AND ICOL < -16"); @@ -181,7 +181,7 @@ public class TestExtractor { Extractor extractor = new GenericJdbcExtractor(); DummyWriter writer = new DummyWriter(); Schema schema = new Schema("TestIncorrectColumns"); - ExtractorContext extractorContext = new ExtractorContext(context, writer, schema); + ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user"); partition.setConditions("-50 <= ICOL AND ICOL < -16"); extractor.extract(extractorContext, linkConfig, jobConfig, partition); @@ -217,7 +217,7 @@ public class TestExtractor { Schema schema = new Schema("TestExtractor"); schema.addColumn(new FixedPoint("c1",2L, true)).addColumn(new Decimal("c2", 5, 2)).addColumn(new Text("c3")).addColumn(new Date("c4")); - ExtractorContext extractorContext = new ExtractorContext(context, writer, schema); + ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user"); GenericJdbcPartition partition = new GenericJdbcPartition(); partition.setConditions("-50 <= ICOL AND ICOL < -16"); http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java index ab31932..1c8379d 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java @@ -44,6 +44,7 @@ public class TestFromInitializer { private final String tableSql; private final String schemalessTableSql; private final String tableColumns; + private final String testUser; private GenericJdbcExecutor executor; @@ -57,6 +58,7 @@ public class TestFromInitializer { tableSql = "SELECT * FROM " + schemaName + "." + tableName + " WHERE ${CONDITIONS}"; schemalessTableSql = "SELECT * FROM " + schemalessTableName + " WHERE ${CONDITIONS}"; tableColumns = "ICOL,VCOL"; + testUser = "test_user"; } @BeforeMethod(alwaysRun = true) @@ -123,7 +125,7 @@ public class TestFromInitializer { jobConfig.fromJobConfig.tableName = schemalessTableName; MutableContext context = new MutableMapContext(); - InitializerContext initializerContext = new InitializerContext(context); + InitializerContext initializerContext = new InitializerContext(context, testUser); @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcFromInitializer(); @@ -151,7 +153,7 @@ public class TestFromInitializer { jobConfig.incrementalRead.lastValue = "-51"; MutableContext context = new MutableMapContext(); - InitializerContext initializerContext = new InitializerContext(context); + InitializerContext initializerContext = new InitializerContext(context, testUser); @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcFromInitializer(); @@ -181,7 +183,7 @@ public class TestFromInitializer { jobConfig.incrementalRead.lastValue = "0"; MutableContext context = new MutableMapContext(); - InitializerContext initializerContext = new InitializerContext(context); + InitializerContext initializerContext = new InitializerContext(context, testUser); @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcFromInitializer(); @@ -210,7 +212,7 @@ public class TestFromInitializer { jobConfig.fromJobConfig.columns = tableColumns; MutableContext context = new MutableMapContext(); - InitializerContext initializerContext = new InitializerContext(context); + InitializerContext initializerContext = new InitializerContext(context, testUser); @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcFromInitializer(); @@ -237,7 +239,7 @@ public class TestFromInitializer { jobConfig.fromJobConfig.partitionColumn = "DCOL"; MutableContext context = new MutableMapContext(); - InitializerContext initializerContext = new InitializerContext(context); + InitializerContext initializerContext = new InitializerContext(context, testUser); @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcFromInitializer(); @@ -266,7 +268,7 @@ public class TestFromInitializer { jobConfig.incrementalRead.lastValue = "-51"; MutableContext context = new MutableMapContext(); - InitializerContext initializerContext = new InitializerContext(context); + InitializerContext initializerContext = new InitializerContext(context, testUser); @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcFromInitializer(); @@ -296,7 +298,7 @@ public class TestFromInitializer { jobConfig.incrementalRead.lastValue = "0"; MutableContext context = new MutableMapContext(); - InitializerContext initializerContext = new InitializerContext(context); + InitializerContext initializerContext = new InitializerContext(context, testUser); @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcFromInitializer(); @@ -326,7 +328,7 @@ public class TestFromInitializer { jobConfig.fromJobConfig.tableName = tableName; MutableContext context = new MutableMapContext(); - InitializerContext initializerContext = new InitializerContext(context); + InitializerContext initializerContext = new InitializerContext(context, testUser); @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcFromInitializer(); @@ -356,7 +358,7 @@ public class TestFromInitializer { jobConfig.fromJobConfig.columns = tableColumns; MutableContext context = new MutableMapContext(); - InitializerContext initializerContext = new InitializerContext(context); + InitializerContext initializerContext = new InitializerContext(context, testUser); @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcFromInitializer(); @@ -384,7 +386,7 @@ public class TestFromInitializer { jobConfig.fromJobConfig.partitionColumn = "DCOL"; MutableContext context = new MutableMapContext(); - InitializerContext initializerContext = new InitializerContext(context); + InitializerContext initializerContext = new InitializerContext(context, testUser); @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcFromInitializer(); @@ -414,7 +416,7 @@ public class TestFromInitializer { jobConfig.fromJobConfig.partitionColumn = "DCOL"; MutableContext context = new MutableMapContext(); - InitializerContext initializerContext = new InitializerContext(context); + InitializerContext initializerContext = new InitializerContext(context, testUser); @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcFromInitializer(); @@ -436,7 +438,7 @@ public class TestFromInitializer { jobConfig.fromJobConfig.partitionColumn = "DCOL"; MutableContext context = new MutableMapContext(); - InitializerContext initializerContext = new InitializerContext(context); + InitializerContext initializerContext = new InitializerContext(context, testUser); @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcFromInitializer(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java index 83411fb..6f7612c 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java @@ -107,7 +107,7 @@ public class TestLoader { schema.addColumn(new FixedPoint("c1", 2L, true)).addColumn(new Decimal("c2", 5, 2)) .addColumn(new Text("c3")).addColumn(new Date("c4")) .addColumn(new DateTime("c5", false, false)).addColumn(new Time("c6", false)).addColumn(new DateTime("c7", false, false)); - LoaderContext loaderContext = new LoaderContext(context, reader, schema); + LoaderContext loaderContext = new LoaderContext(context, reader, schema, "test_user"); loader.load(loaderContext, linkConfig, jobConfig); int index = START; http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java index bec6478..3a767ab 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java @@ -62,7 +62,7 @@ public class TestPartitioner { FromJobConfiguration jobConfig = new FromJobConfiguration(); Partitioner partitioner = new GenericJdbcPartitioner(); - PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); + PartitionerContext partitionerContext = new PartitionerContext(context, 5, null, "test_user"); List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig); verifyResult(partitions, new String[] { @@ -94,7 +94,7 @@ public class TestPartitioner { FromJobConfiguration jobConfig = new FromJobConfiguration(); Partitioner partitioner = new GenericJdbcPartitioner(); - PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null, "test_user"); List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig); verifyResult(partitions, new String[] { @@ -124,7 +124,7 @@ public class TestPartitioner { FromJobConfiguration jobConfig = new FromJobConfiguration(); Partitioner partitioner = new GenericJdbcPartitioner(); - PartitionerContext partitionerContext = new PartitionerContext(context, 13, null); + PartitionerContext partitionerContext = new PartitionerContext(context, 13, null, "test_user"); List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig); verifyResult(partitions, new String[] { @@ -161,7 +161,7 @@ public class TestPartitioner { FromJobConfiguration jobConfig = new FromJobConfiguration(); Partitioner partitioner = new GenericJdbcPartitioner(); - PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); + PartitionerContext partitionerContext = new PartitionerContext(context, 5, null, "test_user"); List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig); verifyResult(partitions, new String[] { @@ -193,7 +193,7 @@ public class TestPartitioner { FromJobConfiguration jobConfig = new FromJobConfiguration(); Partitioner partitioner = new GenericJdbcPartitioner(); - PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null, "test_user"); List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig); verifyResult(partitions, new String[] { @@ -215,7 +215,7 @@ public class TestPartitioner { FromJobConfiguration jobConfig = new FromJobConfiguration(); Partitioner partitioner = new GenericJdbcPartitioner(); - PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); + PartitionerContext partitionerContext = new PartitionerContext(context, 5, null, "test_user"); List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig); verifyResult(partitions, new String[] { @@ -239,7 +239,7 @@ public class TestPartitioner { FromJobConfiguration jobConfig = new FromJobConfiguration(); Partitioner partitioner = new GenericJdbcPartitioner(); - PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null, "test_user"); List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig); verifyResult(partitions, new String[]{ @@ -261,7 +261,7 @@ public class TestPartitioner { FromJobConfiguration jobConfig = new FromJobConfiguration(); Partitioner partitioner = new GenericJdbcPartitioner(); - PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null, "test_user"); List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig); verifyResult(partitions, new String[]{ @@ -286,7 +286,7 @@ public class TestPartitioner { FromJobConfiguration jobConfig = new FromJobConfiguration(); Partitioner partitioner = new GenericJdbcPartitioner(); - PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null, "test_user"); List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig); @@ -315,7 +315,7 @@ public class TestPartitioner { FromJobConfiguration jobConfig = new FromJobConfiguration(); Partitioner partitioner = new GenericJdbcPartitioner(); - PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null, "test_user"); List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig); verifyResult(partitions, new String[]{ @@ -341,7 +341,7 @@ public class TestPartitioner { FromJobConfiguration jobConfig = new FromJobConfiguration(); Partitioner partitioner = new GenericJdbcPartitioner(); - PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null, "test_user"); List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig); verifyResult(partitions, new String[]{ "'2013-01-01 01:01:01.123' <= TSCOL AND TSCOL < '2013-05-02 12:14:17.634'", @@ -366,7 +366,7 @@ public class TestPartitioner { FromJobConfiguration jobConfig = new FromJobConfiguration(); Partitioner partitioner = new GenericJdbcPartitioner(); - PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null, "test_user"); List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig); verifyResult(partitions, new String[]{ "BCOL = TRUE", @@ -390,7 +390,7 @@ public class TestPartitioner { FromJobConfiguration jobConfig = new FromJobConfiguration(); Partitioner partitioner = new GenericJdbcPartitioner(); - PartitionerContext partitionerContext = new PartitionerContext(context, 25, null); + PartitionerContext partitionerContext = new PartitionerContext(context, 25, null, "test_user"); List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig); verifyResult(partitions, new String[] { @@ -437,7 +437,7 @@ public class TestPartitioner { LinkConfiguration linkConfig = new LinkConfiguration(); FromJobConfiguration jobConfig = new FromJobConfiguration(); Partitioner partitioner = new GenericJdbcPartitioner(); - PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); + PartitionerContext partitionerContext = new PartitionerContext(context, 5, null, "test_user"); List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig); assertEquals(partitions.size(), 5); // First partition needs to contain entire upper bound @@ -462,7 +462,7 @@ public class TestPartitioner { FromJobConfiguration jobConfig = new FromJobConfiguration(); Partitioner partitioner = new GenericJdbcPartitioner(); - PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); + PartitionerContext partitionerContext = new PartitionerContext(context, 5, null, "test_user"); List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig); @@ -493,7 +493,7 @@ public class TestPartitioner { jobConfig.fromJobConfig.allowNullValueInPartitionColumn = true; Partitioner partitioner = new GenericJdbcPartitioner(); - PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); + PartitionerContext partitionerContext = new PartitionerContext(context, 5, null, "test_user"); List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig); http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java index df405c8..40278b6 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java @@ -41,6 +41,7 @@ public class TestToInitializer { private final String schemalessTableName; private final String stageTableName; private final String tableColumns; + private final String testUser; private GenericJdbcExecutor executor; @@ -50,6 +51,7 @@ public class TestToInitializer { schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE"; stageTableName = getClass().getSimpleName().toUpperCase() + "_STAGE_TABLE"; tableColumns = "ICOL,VCOL"; + testUser = "test_user"; } @BeforeMethod(alwaysRun = true) @@ -86,7 +88,7 @@ public class TestToInitializer { jobConfig.toJobConfig.tableName = schemalessTableName; MutableContext context = new MutableMapContext(); - InitializerContext initializerContext = new InitializerContext(context); + InitializerContext initializerContext = new InitializerContext(context, testUser); @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcToInitializer(); @@ -109,7 +111,7 @@ public class TestToInitializer { jobConfig.toJobConfig.columns = tableColumns; MutableContext context = new MutableMapContext(); - InitializerContext initializerContext = new InitializerContext(context); + InitializerContext initializerContext = new InitializerContext(context, testUser); @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcToInitializer(); @@ -132,7 +134,7 @@ public class TestToInitializer { jobConfig.toJobConfig.tableName = tableName; MutableContext context = new MutableMapContext(); - InitializerContext initializerContext = new InitializerContext(context); + InitializerContext initializerContext = new InitializerContext(context, testUser); @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcToInitializer(); @@ -156,7 +158,7 @@ public class TestToInitializer { jobConfig.toJobConfig.columns = tableColumns; MutableContext context = new MutableMapContext(); - InitializerContext initializerContext = new InitializerContext(context); + InitializerContext initializerContext = new InitializerContext(context, testUser); @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcToInitializer(); @@ -191,7 +193,7 @@ public class TestToInitializer { jobConfig.toJobConfig.stageTableName = stageTableName; MutableContext context = new MutableMapContext(); - InitializerContext initializerContext = new InitializerContext(context); + InitializerContext initializerContext = new InitializerContext(context, testUser); @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcToInitializer(); @@ -219,7 +221,7 @@ public class TestToInitializer { executor.executeUpdate("INSERT INTO " + fullStageTableName + " VALUES(1, 1.1, 'one')"); MutableContext context = new MutableMapContext(); - InitializerContext initializerContext = new InitializerContext(context); + InitializerContext initializerContext = new InitializerContext(context, testUser); @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcToInitializer(); @@ -278,7 +280,7 @@ public class TestToInitializer { executor.executeUpdate("INSERT INTO " + fullStageTableName + " VALUES(1, 1.1, 'one')"); MutableContext context = new MutableMapContext(); - InitializerContext initializerContext = new InitializerContext(context); + InitializerContext initializerContext = new InitializerContext(context, testUser); @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcToInitializer(); @@ -301,7 +303,7 @@ public class TestToInitializer { jobConfig.toJobConfig.stageTableName = stageTableName; createTable(fullStageTableName); MutableContext context = new MutableMapContext(); - InitializerContext initializerContext = new InitializerContext(context); + InitializerContext initializerContext = new InitializerContext(context, testUser); @SuppressWarnings("rawtypes") Initializer initializer = new GenericJdbcToInitializer(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java index 23bbcc0..583acdd 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java @@ -19,6 +19,7 @@ package org.apache.sqoop.connector.hdfs; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.security.PrivilegedExceptionAction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -29,6 +30,7 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.LineReader; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; @@ -56,25 +58,29 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura private long rowsRead = 0; @Override - public void extract(ExtractorContext context, LinkConfiguration linkConfiguration, FromJobConfiguration jobConfiguration, HdfsPartition partition) { - HdfsUtils.contextToConfiguration(context.getContext(), conf); - dataWriter = context.getDataWriter(); - schema = context.getSchema(); - + public void extract(final ExtractorContext context, final LinkConfiguration linkConfiguration, final FromJobConfiguration jobConfiguration, final HdfsPartition partition) { try { - HdfsPartition p = partition; - LOG.info("Working on partition: " + p); - int numFiles = p.getNumberOfFiles(); - for (int i = 0; i < numFiles; i++) { - extractFile(linkConfiguration, jobConfiguration, p.getFile(i), p.getOffset(i), p.getLength(i)); - } - } catch (IOException e) { + UserGroupInformation.createProxyUser(context.getUser(), UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() { + public Void run() throws Exception { + HdfsUtils.contextToConfiguration(context.getContext(), conf); + dataWriter = context.getDataWriter(); + schema = context.getSchema(); + HdfsPartition p = partition; + LOG.info("Working on partition: " + p); + int numFiles = p.getNumberOfFiles(); + for (int i = 0; i < numFiles; i++) { + extractFile(linkConfiguration, jobConfiguration, p.getFile(i), p.getOffset(i), p.getLength(i)); + } + return null; + } + }); + } catch (Exception e) { throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0001, e); } } private void extractFile(LinkConfiguration linkConfiguration, - FromJobConfiguration fromJobCOnfiguration, + FromJobConfiguration fromJobConfiguration, Path file, long start, long length) throws IOException { long end = start + length; @@ -83,9 +89,9 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura LOG.info("\t to offset " + end); LOG.info("\t of length " + length); if(isSequenceFile(file)) { - extractSequenceFile(linkConfiguration, fromJobCOnfiguration, file, start, length); + extractSequenceFile(linkConfiguration, fromJobConfiguration, file, start, length); } else { - extractTextFile(linkConfiguration, fromJobCOnfiguration, file, start, length); + extractTextFile(linkConfiguration, fromJobConfiguration, file, start, length); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java index e98e02b..be837ca 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.sqoop.common.MapContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; @@ -32,6 +33,7 @@ import org.apache.sqoop.job.etl.InitializerContext; import org.apache.log4j.Logger; import java.io.IOException; +import java.security.PrivilegedExceptionAction; public class HdfsFromInitializer extends Initializer<LinkConfiguration, FromJobConfiguration> { @@ -48,43 +50,49 @@ public class HdfsFromInitializer extends Initializer<LinkConfiguration, FromJobC * @param jobConfig FROM job configuration object */ @Override - public void initialize(InitializerContext context, LinkConfiguration linkConfig, FromJobConfiguration jobConfig) { + @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"}) + public void initialize(final InitializerContext context, final LinkConfiguration linkConfig, final FromJobConfiguration jobConfig) { assert jobConfig.incremental != null; - Configuration configuration = HdfsUtils.createConfiguration(linkConfig); + final Configuration configuration = HdfsUtils.createConfiguration(linkConfig); HdfsUtils.contextToConfiguration(new MapContext(linkConfig.linkConfig.configOverrides), configuration); HdfsUtils.configurationToContext(configuration, context.getContext()); - boolean incremental = jobConfig.incremental.incrementalType != null && jobConfig.incremental.incrementalType == IncrementalType.NEW_FILES; + final boolean incremental = jobConfig.incremental.incrementalType != null && jobConfig.incremental.incrementalType == IncrementalType.NEW_FILES; // In case of incremental import, we need to persist the highest last modified try { - FileSystem fs = FileSystem.get(configuration); - Path path = new Path(jobConfig.fromJobConfig.inputDirectory); - LOG.info("Input directory: " + path.toString()); + UserGroupInformation.createProxyUser(context.getUser(), UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() { + public Void run() throws Exception { + FileSystem fs = FileSystem.get(configuration); + Path path = new Path(jobConfig.fromJobConfig.inputDirectory); + LOG.info("Input directory: " + path.toString()); - if(!fs.exists(path)) { - throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Input directory doesn't exists"); - } + if(!fs.exists(path)) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Input directory doesn't exists"); + } + + if(fs.isFile(path)) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Input directory is a file"); + } - if(fs.isFile(path)) { - throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Input directory is a file"); - } + if(incremental) { + LOG.info("Detected incremental import"); + long maxModifiedTime = -1; + FileStatus[] fileStatuses = fs.listStatus(path); + for(FileStatus status : fileStatuses) { + if(maxModifiedTime < status.getModificationTime()) { + maxModifiedTime = status.getModificationTime(); + } + } - if(incremental) { - LOG.info("Detected incremental import"); - long maxModifiedTime = -1; - FileStatus[] fileStatuses = fs.listStatus(path); - for(FileStatus status : fileStatuses) { - if(maxModifiedTime < status.getModificationTime()) { - maxModifiedTime = status.getModificationTime(); + LOG.info("Maximal age of file is: " + maxModifiedTime); + context.getContext().setLong(HdfsConstants.MAX_IMPORT_DATE, maxModifiedTime); } + return null; } - - LOG.info("Maximal age of file is: " + maxModifiedTime); - context.getContext().setLong(HdfsConstants.MAX_IMPORT_DATE, maxModifiedTime); - } - } catch (IOException e) { + }); + } catch (Exception e) { throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Unexpected exception", e); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java index 798e552..04acd18 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java @@ -18,12 +18,14 @@ package org.apache.sqoop.connector.hdfs; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.UUID; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.common.SqoopIDFUtils; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; @@ -52,40 +54,43 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> { * @throws Exception */ @Override - public void load(LoaderContext context, LinkConfiguration linkConfiguration, - ToJobConfiguration toJobConfig) throws Exception { - Configuration conf = new Configuration(); - HdfsUtils.contextToConfiguration(context.getContext(), conf); - - DataReader reader = context.getDataReader(); - String directoryName = context.getString(HdfsConstants.WORK_DIRECTORY); - String codecname = getCompressionCodecName(toJobConfig); - - CompressionCodec codec = null; - if (codecname != null) { - Class<?> clz = ClassUtils.loadClass(codecname); - if (clz == null) { - throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0003, codecname); - } + public void load(final LoaderContext context, final LinkConfiguration linkConfiguration, + final ToJobConfiguration toJobConfig) throws Exception { + UserGroupInformation.createProxyUser(context.getUser(), + UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() { + public Void run() throws Exception { + Configuration conf = new Configuration(); + HdfsUtils.contextToConfiguration(context.getContext(), conf); + + DataReader reader = context.getDataReader(); + String directoryName = context.getString(HdfsConstants.WORK_DIRECTORY); + String codecname = getCompressionCodecName(toJobConfig); + + CompressionCodec codec = null; + if (codecname != null) { + Class<?> clz = ClassUtils.loadClass(codecname); + if (clz == null) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0003, codecname); + } - try { - codec = (CompressionCodec) clz.newInstance(); - if (codec instanceof Configurable) { - ((Configurable) codec).setConf(conf); + try { + codec = (CompressionCodec) clz.newInstance(); + if (codec instanceof Configurable) { + ((Configurable) codec).setConf(conf); + } + } catch (RuntimeException|InstantiationException|IllegalAccessException e) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0004, codecname, e); + } } - } catch (RuntimeException|InstantiationException|IllegalAccessException e) { - throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0004, codecname, e); - } - } - String filename = directoryName + "/" + UUID.randomUUID() + getExtension(toJobConfig,codec); + String filename = directoryName + "/" + UUID.randomUUID() + getExtension(toJobConfig,codec); - try { - Path filepath = new Path(filename); + try { + Path filepath = new Path(filename); - GenericHdfsWriter filewriter = getWriter(toJobConfig); + GenericHdfsWriter filewriter = getWriter(toJobConfig); - filewriter.initialize(filepath, conf, codec); + filewriter.initialize(filepath, conf, codec); if (!HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfig) || (context.getSchema() instanceof ByteArraySchema)) { String record; @@ -110,10 +115,12 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> { } filewriter.destroy(); - } catch (IOException e) { - throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0005, e); + } catch (IOException e) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0005, e); + } + return null; } - + }); } private GenericHdfsWriter getWriter(ToJobConfiguration toJobConf) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java index ff16ad7..998b903 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java @@ -19,6 +19,7 @@ package org.apache.sqoop.connector.hdfs; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -38,6 +39,7 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NodeBase; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; @@ -71,104 +73,108 @@ public class HdfsPartitioner extends Partitioner<LinkConfiguration, FromJobConfi new HashMap<String, Set<String>>(); @Override - public List<Partition> getPartitions(PartitionerContext context, - LinkConfiguration linkConfiguration, - FromJobConfiguration fromJobConfig) { + public List<Partition> getPartitions(final PartitionerContext context, + final LinkConfiguration linkConfiguration, + final FromJobConfiguration fromJobConfig) { assert fromJobConfig.incremental != null; - Configuration conf = new Configuration(); + final Configuration conf = new Configuration(); HdfsUtils.contextToConfiguration(context.getContext(), conf); + final List<Partition> partitions = new ArrayList<>(); try { - long numInputBytes = getInputSize(conf, fromJobConfig.fromJobConfig.inputDirectory); - maxSplitSize = numInputBytes / context.getMaxPartitions(); - - if(numInputBytes % context.getMaxPartitions() != 0 ) { - maxSplitSize += 1; - } - - long minSizeNode = 0; - long minSizeRack = 0; - long maxSize = 0; + UserGroupInformation.createProxyUser(context.getUser(), + UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() { + public Void run() throws Exception { + long numInputBytes = getInputSize(conf, fromJobConfig.fromJobConfig.inputDirectory); + maxSplitSize = numInputBytes / context.getMaxPartitions(); + + if (numInputBytes % context.getMaxPartitions() != 0) { + maxSplitSize += 1; + } - // the values specified by setxxxSplitSize() takes precedence over the - // values that might have been specified in the config - if (minSplitSizeNode != 0) { - minSizeNode = minSplitSizeNode; - } else { - minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0); - } - if (minSplitSizeRack != 0) { - minSizeRack = minSplitSizeRack; - } else { - minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0); - } - if (maxSplitSize != 0) { - maxSize = maxSplitSize; - } else { - maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0); - } - if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) { - throw new IOException("Minimum split size pernode " + minSizeNode + - " cannot be larger than maximum split size " + - maxSize); - } - if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) { - throw new IOException("Minimum split size per rack" + minSizeRack + - " cannot be larger than maximum split size " + - maxSize); - } - if (minSizeRack != 0 && minSizeNode > minSizeRack) { - throw new IOException("Minimum split size per node" + minSizeNode + - " cannot be smaller than minimum split " + - "size per rack " + minSizeRack); - } + long minSizeNode = 0; + long minSizeRack = 0; + long maxSize = 0; - // Incremental import related options - boolean incremental = fromJobConfig.incremental.incrementalType != null && fromJobConfig.incremental.incrementalType == IncrementalType.NEW_FILES; - long lastImportedDate = fromJobConfig.incremental.lastImportedDate != null ? fromJobConfig.incremental.lastImportedDate.getMillis() : -1; - long maxImportDate = context.getLong(HdfsConstants.MAX_IMPORT_DATE, -1); - - // all the files in input set - String indir = fromJobConfig.fromJobConfig.inputDirectory; - FileSystem fs = FileSystem.get(conf); - - List<Path> paths = new LinkedList<Path>(); - for(FileStatus status : fs.listStatus(new Path(indir))) { - if(!status.isDir()) { - if(incremental) { - long modifiedDate = status.getModificationTime(); - if(lastImportedDate < modifiedDate && modifiedDate <= maxImportDate) { - LOG.info("Will process input file: " + status.getPath() + " with modification date " + modifiedDate); - paths.add(status.getPath()); - } else { - LOG.info("Skipping input file: " + status.getPath() + " with modification date " + modifiedDate); - } + // the values specified by setxxxSplitSize() takes precedence over the + // values that might have been specified in the config + if (minSplitSizeNode != 0) { + minSizeNode = minSplitSizeNode; } else { - // Without incremental mode, we're processing all files - LOG.info("Will process input file: " + status.getPath()); - paths.add(status.getPath()); + minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0); + } + if (minSplitSizeRack != 0) { + minSizeRack = minSplitSizeRack; + } else { + minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0); + } + if (maxSplitSize != 0) { + maxSize = maxSplitSize; + } else { + maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0); + } + if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) { + throw new IOException("Minimum split size pernode " + minSizeNode + + " cannot be larger than maximum split size " + maxSize); + } + if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) { + throw new IOException("Minimum split size per rack" + minSizeRack + + " cannot be larger than maximum split size " + maxSize); + } + if (minSizeRack != 0 && minSizeNode > minSizeRack) { + throw new IOException("Minimum split size per node" + minSizeNode + + " cannot be smaller than minimum split " + "size per rack " + minSizeRack); } - } - } - List<Partition> partitions = new ArrayList<Partition>(); - if (paths.size() == 0) { - return partitions; - } + // Incremental import related options + boolean incremental = fromJobConfig.incremental.incrementalType != null + && fromJobConfig.incremental.incrementalType == IncrementalType.NEW_FILES; + long lastImportedDate = fromJobConfig.incremental.lastImportedDate != null + ? fromJobConfig.incremental.lastImportedDate.getMillis() : -1; + long maxImportDate = context.getLong(HdfsConstants.MAX_IMPORT_DATE, -1); + + // all the files in input set + String indir = fromJobConfig.fromJobConfig.inputDirectory; + FileSystem fs = FileSystem.get(conf); + + List<Path> paths = new LinkedList<Path>(); + for (FileStatus status : fs.listStatus(new Path(indir))) { + if (!status.isDir()) { + if (incremental) { + long modifiedDate = status.getModificationTime(); + if (lastImportedDate < modifiedDate && modifiedDate <= maxImportDate) { + LOG.info("Will process input file: " + status.getPath() + " with modification date " + modifiedDate); + paths.add(status.getPath()); + } else { + LOG.info("Skipping input file: " + status.getPath() + " with modification date " + modifiedDate); + } + } else { + // Without incremental mode, we're processing all files + LOG.info("Will process input file: " + status.getPath()); + paths.add(status.getPath()); + } + } + } - // create splits for all files that are not in any pool. - getMoreSplits(conf, paths, - maxSize, minSizeNode, minSizeRack, partitions); + if (paths.size() == 0) { + return null; + } - // free up rackToNodes map - rackToNodes.clear(); + // create splits for all files that are not in any pool. + getMoreSplits(conf, paths, maxSize, minSizeNode, minSizeRack, partitions); - return partitions; + // free up rackToNodes map + rackToNodes.clear(); - } catch (IOException e) { + return null; + } + }); + } catch (Exception e) { throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0000, e); } + + return partitions; } //TODO: Perhaps get the FS from link configuration so we can support remote HDFS http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java index 11b2ae3..2bad23a 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; @@ -30,6 +31,7 @@ import org.apache.sqoop.job.etl.Destroyer; import org.apache.sqoop.job.etl.DestroyerContext; import java.io.IOException; +import java.security.PrivilegedExceptionAction; public class HdfsToDestroyer extends Destroyer<LinkConfiguration, ToJobConfiguration> { @@ -39,28 +41,38 @@ public class HdfsToDestroyer extends Destroyer<LinkConfiguration, ToJobConfigura * {@inheritDoc} */ @Override - public void destroy(DestroyerContext context, LinkConfiguration linkConfig, ToJobConfiguration jobConfig) { - Configuration configuration = new Configuration(); + @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"}) + public void destroy(final DestroyerContext context, final LinkConfiguration linkConfig, final ToJobConfiguration jobConfig) { + final Configuration configuration = new Configuration(); HdfsUtils.contextToConfiguration(context.getContext(), configuration); - String workingDirectory = context.getString(HdfsConstants.WORK_DIRECTORY); - Path targetDirectory = new Path(jobConfig.toJobConfig.outputDirectory); + final String workingDirectory = context.getString(HdfsConstants.WORK_DIRECTORY); + final Path targetDirectory = new Path(jobConfig.toJobConfig.outputDirectory); try { - FileSystem fs = FileSystem.get(configuration); + UserGroupInformation.createProxyUser(context.getUser(), + UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() { + public Void run() throws Exception { + FileSystem fs = FileSystem.get(configuration); - // If we succeeded, we need to move all files from working directory - if(context.isSuccess()) { - FileStatus[] fileStatuses = fs.listStatus(new Path(workingDirectory)); - for (FileStatus status : fileStatuses) { - LOG.info("Committing file: " + status.getPath().toString() + " of size " + status.getLen()); - fs.rename(status.getPath(), new Path(targetDirectory, status.getPath().getName())); - } - } + // If we succeeded, we need to move all files from working directory + if (context.isSuccess()) { + FileStatus[] fileStatuses = fs.listStatus(new Path + (workingDirectory)); + for (FileStatus status : fileStatuses) { + LOG.info("Committing file: " + status.getPath().toString() + " " + + "of size " + status.getLen()); + fs.rename(status.getPath(), new Path(targetDirectory, status + .getPath().getName())); + } + } - // Clean up working directory - fs.delete(new Path(workingDirectory), true); - } catch (IOException e) { + // Clean up working directory + fs.delete(new Path(workingDirectory), true); + return null; + } + }); + } catch (Exception e) { throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0008, e); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java index 29cf3b9..5856371 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.log4j.Logger; import org.apache.sqoop.common.MapContext; import org.apache.sqoop.common.SqoopException; @@ -31,6 +32,7 @@ import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.UUID; public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfiguration> { @@ -41,36 +43,43 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfi * {@inheritDoc} */ @Override - public void initialize(InitializerContext context, LinkConfiguration linkConfig, ToJobConfiguration jobConfig) { + @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"}) + public void initialize(final InitializerContext context, final LinkConfiguration linkConfig, final ToJobConfiguration jobConfig) { assert jobConfig != null; assert linkConfig != null; assert jobConfig.toJobConfig != null; assert jobConfig.toJobConfig.outputDirectory != null; - Configuration configuration = HdfsUtils.createConfiguration(linkConfig); + final Configuration configuration = HdfsUtils.createConfiguration(linkConfig); HdfsUtils.contextToConfiguration(new MapContext(linkConfig.linkConfig.configOverrides), configuration); HdfsUtils.configurationToContext(configuration, context.getContext()); - boolean appendMode = Boolean.TRUE.equals(jobConfig.toJobConfig.appendMode); + final boolean appendMode = Boolean.TRUE.equals(jobConfig.toJobConfig.appendMode); // Verification that given HDFS directory either don't exists or is empty try { - FileSystem fs = FileSystem.get(configuration); - Path path = new Path(jobConfig.toJobConfig.outputDirectory); + UserGroupInformation.createProxyUser(context.getUser(), + UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() { + public Void run() throws Exception { + FileSystem fs = FileSystem.get(configuration); + Path path = new Path(jobConfig.toJobConfig.outputDirectory); - if(fs.exists(path)) { - if(fs.isFile(path)) { - throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output directory already exists and is a file"); - } + if (fs.exists(path)) { + if (fs.isFile(path)) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output directory already exists and is a file"); + } - if(fs.isDirectory(path) && !appendMode) { - FileStatus[] fileStatuses = fs.listStatus(path); - if(fileStatuses.length != 0) { - throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output directory is not empty"); + if (fs.isDirectory(path) && !appendMode) { + FileStatus[] fileStatuses = fs.listStatus(path); + if (fileStatuses.length != 0) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output directory is not empty"); + } + } } + return null; } - } - } catch (IOException e) { + }); + } catch (Exception e) { throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Unexpected exception", e); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java index 6f9986d..7d2177f 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java @@ -51,6 +51,7 @@ public class TestExtractor extends TestHdfsBase { private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/"; private static final int NUMBER_OF_FILES = 5; private static final int NUMBER_OF_ROWS_PER_FILE = 1000; + private static final String TEST_USER = "test_user"; private ToFormat outputFileType; private Class<? extends CompressionCodec> compressionClass; @@ -132,6 +133,8 @@ public class TestExtractor extends TestHdfsBase { Assert.assertEquals("'" + index + "'", components[3]); Assert.assertEquals("\\\\N", components[4]); + assertTestUser(TEST_USER); + visited[index - 1] = true; } @@ -139,7 +142,7 @@ public class TestExtractor extends TestHdfsBase { public void writeRecord(Object obj) { throw new AssertionError("Should not be writing object."); } - }, schema); + }, schema, TEST_USER); LinkConfiguration emptyLinkConfig = new LinkConfiguration(); FromJobConfiguration emptyJobConfig = new FromJobConfiguration(); @@ -193,7 +196,7 @@ public class TestExtractor extends TestHdfsBase { public void writeRecord(Object obj) { throw new AssertionError("Should not be writing object."); } - }, schema); + }, schema, TEST_USER); LinkConfiguration emptyLinkConfig = new LinkConfiguration(); FromJobConfiguration fromJobConfiguration = new FromJobConfiguration(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromDestroyer.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromDestroyer.java index 569c60b..f388040 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromDestroyer.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromDestroyer.java @@ -34,19 +34,21 @@ public class TestFromDestroyer { LinkConfiguration linkConfig; FromJobConfiguration jobConfig; MutableContext context; + String user; public TestFromDestroyer() { linkConfig = new LinkConfiguration(); jobConfig = new FromJobConfiguration(); context = new MutableMapContext(); destroyer = new HdfsFromDestroyer(); + user = "test_user"; } @Test public void testUpdateConfiguration() { DateTime dt = new DateTime(); context.setLong(HdfsConstants.MAX_IMPORT_DATE, dt.getMillis()); - destroyer.updateConfiguration(new DestroyerContext(context, true, null), linkConfig, jobConfig); + destroyer.updateConfiguration(new DestroyerContext(context, true, null, user), linkConfig, jobConfig); assertEquals(jobConfig.incremental.lastImportedDate, dt); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromInitializer.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromInitializer.java index 52c174e..119dbdb 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromInitializer.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromInitializer.java @@ -48,7 +48,7 @@ public class TestFromInitializer { jobConfig = new FromJobConfiguration(); context = new MutableMapContext(); initializer = new HdfsFromInitializer(); - initializerContext = new InitializerContext(context); + initializerContext = new InitializerContext(context, "test_user"); } @Test http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java index ac44595..6bacfa2 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java @@ -25,6 +25,8 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.security.UserGroupInformation; +import org.testng.Assert; import java.io.BufferedWriter; import java.io.IOException; @@ -150,4 +152,13 @@ public class TestHdfsBase { throws IOException, InstantiationException, IllegalAccessException { createSequenceInput(indir, clz, numberOfFiles, numberOfRows, "%d,%f,%s"); } + + protected void assertTestUser(String testUser) { + // Ensure that we are impersonating correctly + try{ + Assert.assertEquals(UserGroupInformation.getCurrentUser().getUserName(), testUser); + } catch (Exception e) { + Assert.fail(); + } + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java index 688067b..11fcef2 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java @@ -62,6 +62,7 @@ public class TestLoader extends TestHdfsBase { private ToCompression compression; private final String outputDirectory; private Loader loader; + private String user = "test_user"; @Factory(dataProvider="test-hdfs-loader") public TestLoader(ToFormat outputFormat, @@ -110,11 +111,13 @@ public class TestLoader extends TestHdfsBase { @Override public Object[] readArrayRecord() { + assertTestUser(user); return null; } @Override public String readTextRecord() { + assertTestUser(user); if (index++ < NUMBER_OF_ROWS_PER_FILE) { return index + "," + (double)index + ",'" + index + "'"; } else { @@ -124,9 +127,10 @@ public class TestLoader extends TestHdfsBase { @Override public Object readContent() { + assertTestUser(user); return null; } - }, null); + }, null, user); LinkConfiguration linkConf = new LinkConfiguration(); ToJobConfiguration jobConf = new ToJobConfiguration(); jobConf.toJobConfig.compression = compression; @@ -163,6 +167,8 @@ public class TestLoader extends TestHdfsBase { @Override public Object[] readArrayRecord() { + assertTestUser(user); + if (index++ < NUMBER_OF_ROWS_PER_FILE) { return new Object[]{ index, @@ -184,7 +190,7 @@ public class TestLoader extends TestHdfsBase { public Object readContent() { throw new AssertionError("should not be at readContent"); } - }, schema); + }, schema, "test_user"); LinkConfiguration linkConf = new LinkConfiguration(); ToJobConfiguration jobConf = new ToJobConfiguration(); jobConf.toJobConfig.compression = compression; http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java index 9a6bfff..7627e98 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java @@ -94,7 +94,7 @@ public class TestPartitioner extends TestHdfsBase { @Test public void testPartitioner() { - PartitionerContext context = new PartitionerContext(new MapContext(new HashMap<String, String>()), 5, null); + PartitionerContext context = new PartitionerContext(new MapContext(new HashMap<String, String>()), 5, null, "test_user"); LinkConfiguration linkConf = new LinkConfiguration(); FromJobConfiguration jobConf = new FromJobConfiguration(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToDestroyer.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToDestroyer.java index e1f416e..687a9b3 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToDestroyer.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToDestroyer.java @@ -54,7 +54,7 @@ public class TestToDestroyer { context.setString(HdfsConstants.WORK_DIRECTORY, workDir.getAbsolutePath()); Destroyer destroyer = new HdfsToDestroyer(); - destroyer.destroy(new DestroyerContext(context, true, null), linkConfig, jobConfig); + destroyer.destroy(new DestroyerContext(context, true, null, "test_user"), linkConfig, jobConfig); File[] files = targetDir.listFiles(); @@ -99,7 +99,7 @@ public class TestToDestroyer { context.setString(HdfsConstants.WORK_DIRECTORY, workDir.getAbsolutePath()); Destroyer destroyer = new HdfsToDestroyer(); - destroyer.destroy(new DestroyerContext(context, false, null), linkConfig, jobConfig); + destroyer.destroy(new DestroyerContext(context, false, null, "test_user"), linkConfig, jobConfig); File[] files = targetDir.listFiles(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java index a98a46a..5441702 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java @@ -46,7 +46,7 @@ public class TestToInitializer extends TestHdfsBase { linkConfig.linkConfig.uri = "file:///"; jobConfig.toJobConfig.outputDirectory = TARGET_DIR; - InitializerContext initializerContext = new InitializerContext(new MutableMapContext()); + InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user"); Initializer initializer = new HdfsToInitializer(); initializer.initialize(initializerContext, linkConfig, jobConfig); @@ -66,7 +66,7 @@ public class TestToInitializer extends TestHdfsBase { linkConfig.linkConfig.uri = "file:///"; jobConfig.toJobConfig.outputDirectory = file.getAbsolutePath(); - InitializerContext initializerContext = new InitializerContext(new MutableMapContext()); + InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user"); Initializer initializer = new HdfsToInitializer(); initializer.initialize(initializerContext, linkConfig, jobConfig); @@ -83,7 +83,7 @@ public class TestToInitializer extends TestHdfsBase { linkConfig.linkConfig.uri = "file:///"; jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath(); - InitializerContext initializerContext = new InitializerContext(new MutableMapContext()); + InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user"); Initializer initializer = new HdfsToInitializer(); initializer.initialize(initializerContext, linkConfig, jobConfig); @@ -101,7 +101,7 @@ public class TestToInitializer extends TestHdfsBase { jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath(); jobConfig.toJobConfig.appendMode = true; - InitializerContext initializerContext = new InitializerContext(new MutableMapContext()); + InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user"); Initializer initializer = new HdfsToInitializer(); initializer.initialize(initializerContext, linkConfig, jobConfig); http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java index da2a708..0dd00a7 100644 --- a/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java +++ b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java @@ -77,7 +77,7 @@ public class TestKafkaLoader { public Object readContent() { return null; } - }, null); + }, null, "test_user"); LinkConfiguration linkConf = new LinkConfiguration(); ToJobConfiguration jobConf = new ToJobConfiguration(); linkConf.linkConfig.brokerList = testUtil.getKafkaServerUrl(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java index 08d2cb3..c49be92 100644 --- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java +++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java @@ -73,7 +73,7 @@ public class TestKiteExtractor { // setup Schema schema = new Schema("testExtractor"); schema.addColumn(new Text("TextCol")); - ExtractorContext context = new ExtractorContext(null, writerMock, schema); + ExtractorContext context = new ExtractorContext(null, writerMock, schema, "test_user"); LinkConfiguration linkConfig = new LinkConfiguration(); FromJobConfiguration jobConfig = new FromJobConfiguration(); KiteDatasetPartition partition = new KiteDatasetPartition(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java index 533b8c3..c5aa1bd 100644 --- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java +++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java @@ -81,7 +81,7 @@ public class TestKiteLoader { return null; } }; - LoaderContext context = new LoaderContext(null, reader, schema); + LoaderContext context = new LoaderContext(null, reader, schema, "test_user"); LinkConfiguration linkConfig = new LinkConfiguration(); ToJobConfiguration toJobConfig = new ToJobConfiguration(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java index 3fcc339..00b8871 100644 --- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java +++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java @@ -49,6 +49,8 @@ public class TestKiteToDestroyer extends PowerMockTestCase { private final String[] expectedUris = new String[]{"a", "b"}; + private String user; + @org.mockito.Mock private KiteDatasetExecutor executorMock; @@ -70,12 +72,13 @@ public class TestKiteToDestroyer extends PowerMockTestCase { toJobConfig = new ToJobConfiguration(); toJobConfig.toJobConfig.uri = "dataset:file:/foo/bar"; toJobConfig.toJobConfig.fileFormat = FileFormat.AVRO; + user = "test_user"; } @Test public void testDestroyForSuccessfulJob() { // setup - DestroyerContext context = new DestroyerContext(null, true, null); + DestroyerContext context = new DestroyerContext(null, true, null, user); when(KiteDatasetExecutor.listTemporaryDatasetUris(toJobConfig.toJobConfig.uri)) .thenReturn(expectedUris); @@ -91,7 +94,7 @@ public class TestKiteToDestroyer extends PowerMockTestCase { @Test public void testDestroyForFailedJob() { // setup - DestroyerContext context = new DestroyerContext(null, false, null); + DestroyerContext context = new DestroyerContext(null, false, null, user); when(KiteDatasetExecutor.listTemporaryDatasetUris(toJobConfig.toJobConfig.uri)) .thenReturn(expectedUris); for (String uri : expectedUris) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/core/src/main/java/org/apache/sqoop/driver/JobManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java index 0d230f9..15ca796 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java @@ -531,7 +531,8 @@ public class JobManager implements Reconfigurable { } private InitializerContext getConnectorInitializerContext(JobRequest jobRequest, Direction direction) { - return new InitializerContext(jobRequest.getConnectorContext(direction)); + return new InitializerContext(jobRequest.getConnectorContext(direction), + jobRequest.getJobSubmission().getCreationUser()); } void prepareJob(JobRequest request) { @@ -571,8 +572,8 @@ public class JobManager implements Reconfigurable { Destroyer fromDestroyer = (Destroyer) ClassUtils.instantiate(fromConnector.getFrom().getDestroyer()); Destroyer toDestroyer = (Destroyer) ClassUtils.instantiate(toConnector.getTo().getDestroyer()); - DestroyerContext fromDestroyerContext = new DestroyerContext(submission.getFromConnectorContext(), true, submission.getFromSchema()); - DestroyerContext toDestroyerContext = new DestroyerContext(submission.getToConnectorContext(), false, submission.getToSchema()); + DestroyerContext fromDestroyerContext = new DestroyerContext(submission.getFromConnectorContext(), true, submission.getFromSchema(), submission.getCreationUser()); + DestroyerContext toDestroyerContext = new DestroyerContext(submission.getToConnectorContext(), false, submission.getToSchema(), submission.getCreationUser()); fromDestroyer.updateConfiguration(fromDestroyerContext, fromLinkConfig, fromJob); toDestroyer.updateConfiguration(toDestroyerContext, toLinkConfig, toJob); @@ -626,11 +627,11 @@ public class JobManager implements Reconfigurable { } DestroyerContext fromDestroyerContext = new DestroyerContext( - request.getConnectorContext(Direction.FROM), false, request.getJobSubmission() - .getFromSchema()); + request.getConnectorContext(Direction.FROM), false, request.getJobSubmission().getFromSchema(), + request.getJobSubmission().getCreationUser()); DestroyerContext toDestroyerContext = new DestroyerContext( - request.getConnectorContext(Direction.TO), false, request.getJobSubmission() - .getToSchema()); + request.getConnectorContext(Direction.TO), false, request.getJobSubmission().getToSchema(), + request.getJobSubmission().getCreationUser()); fromDestroyer.destroy(fromDestroyerContext, request.getConnectorLinkConfig(Direction.FROM), request.getJobConfig(Direction.FROM));
