Repository: sqoop Updated Branches: refs/heads/sqoop2 a70975c66 -> bf09850c3
SQOOP-2524. Sqoop2: Add S3 support to HDFS Connector (Jarcec via Hari) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/bf09850c Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/bf09850c Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/bf09850c Branch: refs/heads/sqoop2 Commit: bf09850c33721f8d7629a121b15c7f57da9e295f Parents: a70975c Author: Hari Shreedharan <[email protected]> Authored: Thu Sep 24 21:22:17 2015 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Thu Sep 24 21:22:17 2015 -0700 ---------------------------------------------------------------------- .../org/apache/sqoop/common/MapContext.java | 2 +- .../org/apache/sqoop/common/TestMapContext.java | 4 + .../sqoop/common/TestMutableMapContext.java | 4 + connector/connector-hdfs/pom.xml | 9 +- .../connector/hdfs/HdfsFromInitializer.java | 3 +- .../sqoop/connector/hdfs/HdfsToInitializer.java | 3 +- pom.xml | 6 + test/pom.xml | 6 + .../minicluster/TomcatSqoopMiniCluster.java | 4 +- .../integration/connector/hdfs/S3Test.java | 132 +++++++++++++++++++ 10 files changed, 168 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/common/src/main/java/org/apache/sqoop/common/MapContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/common/MapContext.java b/common/src/main/java/org/apache/sqoop/common/MapContext.java index fc722c0..3167530 100644 --- a/common/src/main/java/org/apache/sqoop/common/MapContext.java +++ b/common/src/main/java/org/apache/sqoop/common/MapContext.java @@ -36,7 +36,7 @@ public class MapContext implements ImmutableContext { private final Map<String, String> options; public MapContext(Map<String, String> options) { - this.options = options; + this.options = options == null ? new HashMap<String, String>() : options; } protected Map<String, String> getOptions() { http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/common/src/test/java/org/apache/sqoop/common/TestMapContext.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/sqoop/common/TestMapContext.java b/common/src/test/java/org/apache/sqoop/common/TestMapContext.java index 2a27c0c..22f42a4 100644 --- a/common/src/test/java/org/apache/sqoop/common/TestMapContext.java +++ b/common/src/test/java/org/apache/sqoop/common/TestMapContext.java @@ -23,6 +23,7 @@ import java.util.Map; import org.testng.Assert; import org.testng.annotations.Test; +import static junit.framework.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -40,6 +41,9 @@ public class TestMapContext { options.put("testkey", "testvalue"); MapContext mc = new MapContext(options); Assert.assertEquals("testvalue", mc.getString("testkey")); + + MapContext nullMc = new MapContext(null); + assertNull(nullMc.getString("random.key.property")); } /** http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/common/src/test/java/org/apache/sqoop/common/TestMutableMapContext.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/sqoop/common/TestMutableMapContext.java b/common/src/test/java/org/apache/sqoop/common/TestMutableMapContext.java index db7fa34..3cbe7be 100644 --- a/common/src/test/java/org/apache/sqoop/common/TestMutableMapContext.java +++ b/common/src/test/java/org/apache/sqoop/common/TestMutableMapContext.java @@ -50,6 +50,10 @@ public class TestMutableMapContext { assertEquals(context.getLong("long", -1), 1L); assertEquals(context.getInt("integer", -1), 13); assertEquals(context.getBoolean("boolean", false), true); + + context = new MutableMapContext(null); + context.setString("key", "value"); + assertEquals(context.getString("key"), "value"); } @Test http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/connector/connector-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/pom.xml b/connector/connector-hdfs/pom.xml index 512b54c..a28989c 100644 --- a/connector/connector-hdfs/pom.xml +++ b/connector/connector-hdfs/pom.xml @@ -59,6 +59,13 @@ limitations under the License. <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <scope>provided</scope> </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-aws</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> <build> @@ -77,4 +84,4 @@ limitations under the License. </plugin> </plugins> </build> -</project> \ No newline at end of file +</project> http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java index 6c943a8..e98e02b 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.sqoop.common.MapContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.IncrementalType; @@ -51,8 +52,8 @@ public class HdfsFromInitializer extends Initializer<LinkConfiguration, FromJobC assert jobConfig.incremental != null; Configuration configuration = HdfsUtils.createConfiguration(linkConfig); + HdfsUtils.contextToConfiguration(new MapContext(linkConfig.linkConfig.configOverrides), configuration); HdfsUtils.configurationToContext(configuration, context.getContext()); - context.getContext().setAll(linkConfig.linkConfig.configOverrides); boolean incremental = jobConfig.incremental.incrementalType != null && jobConfig.incremental.incrementalType == IncrementalType.NEW_FILES; http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java index 5bb0928..29cf3b9 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; +import org.apache.sqoop.common.MapContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; @@ -47,8 +48,8 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfi assert jobConfig.toJobConfig.outputDirectory != null; Configuration configuration = HdfsUtils.createConfiguration(linkConfig); + HdfsUtils.contextToConfiguration(new MapContext(linkConfig.linkConfig.configOverrides), configuration); HdfsUtils.configurationToContext(configuration, context.getContext()); - context.getContext().setAll(linkConfig.linkConfig.configOverrides); boolean appendMode = Boolean.TRUE.equals(jobConfig.toJobConfig.appendMode); http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 6e334a7..ef3f5f4 100644 --- a/pom.xml +++ b/pom.xml @@ -426,6 +426,12 @@ limitations under the License. <version>${hadoop.2.version}</version> </dependency> <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-aws</artifactId> + <version>${hadoop.2.version}</version> + <scope>provided</scope> + </dependency> + <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>${hive.version}</version> http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/test/pom.xml ---------------------------------------------------------------------- diff --git a/test/pom.xml b/test/pom.xml index 3e11f59..8218477 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -113,6 +113,12 @@ limitations under the License. </dependency> <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-aws</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <scope>provided</scope> http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java b/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java index 83f42b6..a0ef78a 100644 --- a/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java +++ b/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java @@ -131,7 +131,9 @@ public class TomcatSqoopMiniCluster extends SqoopMiniCluster { jar.contains("sqljdbc") || // Microsoft SQL Server driver jar.contains("libfb303") || // Facebook thrift lib jar.contains("datanucleus-") || // Data nucleus libs - jar.contains("google") // Google libraries (guava, ...) + jar.contains("google") || // Google libraries (guava, ...) + jar.contains("joda-time") || // Joda time + jar.contains("aws-java-sdk") // Amazon AWS SDK (S3, ...) ) { extraClassPath.add(jar); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java new file mode 100644 index 0000000..73a9acf --- /dev/null +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.integration.connector.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.sqoop.connector.hdfs.configuration.ToFormat; +import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MLink; +import org.apache.sqoop.test.asserts.HdfsAsserts; +import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.testng.SkipException; +import org.testng.annotations.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.testng.Assert.assertEquals; + +/** + * Run with something like: + * + * mvn clean test -pl test -Dtest=S3Test + * -Dorg.apache.sqoop.integration.connector.hdfs.s3.bucket=test-bucket + * -Dorg.apache.sqoop.integration.connector.hdfs.s3.access=AKI... + * -Dorg.apache.sqoop.integration.connector.hdfs.s3.secret=93JKx... + */ +public class S3Test extends ConnectorTestCase { + + public static final String PROPERTY_BUCKET = "org.apache.sqoop.integration.connector.hdfs.s3.bucket"; + public static final String PROPERTY_ACCESS = "org.apache.sqoop.integration.connector.hdfs.s3.access"; + public static final String PROPERTY_SECRET = "org.apache.sqoop.integration.connector.hdfs.s3.secret"; + + public static final String BUCKET = System.getProperty(PROPERTY_BUCKET); + public static final String ACCESS = System.getProperty(PROPERTY_ACCESS); + public static final String SECRET = System.getProperty(PROPERTY_SECRET); + + public static final String BUCKET_URL = "s3a://" + BUCKET; + + @Test + public void test() throws Exception { + // Verify presence external configuration + assumeNotNull(BUCKET, PROPERTY_BUCKET); + assumeNotNull(ACCESS, PROPERTY_ACCESS); + assumeNotNull(SECRET, PROPERTY_SECRET); + + createAndLoadTableCities(); + + Configuration hadoopConf = new Configuration(); + hadoopConf.set("fs.defaultFS", BUCKET_URL); + hadoopConf.set("fs.s3a.access.key", ACCESS); + hadoopConf.set("fs.s3a.secret.key", SECRET); + FileSystem s3Client = FileSystem.get(hadoopConf); + + // RDBMS link + MLink rdbmsLink = getClient().createLink("generic-jdbc-connector"); + fillRdbmsLinkConfig(rdbmsLink); + saveLink(rdbmsLink); + + // HDFS link + MLink hdfsLink = getClient().createLink("hdfs-connector"); + hdfsLink.getConnectorLinkConfig().getStringInput("linkConfig.uri").setValue(BUCKET_URL); + Map<String, String> configOverrides = new HashMap<>(); + configOverrides.put("fs.s3a.access.key", ACCESS); + configOverrides.put("fs.s3a.secret.key", SECRET); + hdfsLink.getConnectorLinkConfig().getMapInput("linkConfig.configOverrides").setValue(configOverrides); + saveLink(hdfsLink); + + s3Client.delete(new Path(getMapreduceDirectory()), true); + + // DB -> S3 + MJob db2aws = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId()); + fillRdbmsFromConfig(db2aws, "id"); + fillHdfsToConfig(db2aws, ToFormat.TEXT_FILE); + + saveJob(db2aws); + executeJob(db2aws); + + // Verifying locally imported data + HdfsAsserts.assertMapreduceOutput(s3Client, getMapreduceDirectory(), + "1,'USA','2004-10-23','San Francisco'", + "2,'USA','2004-10-24','Sunnyvale'", + "3,'Czech Republic','2004-10-25','Brno'", + "4,'USA','2004-10-26','Palo Alto'" + ); + + // This re-creates the table completely + createTableCities(); + assertEquals(provider.rowCount(getTableName()), 0); + + // S3 -> DB + MJob aws2db = getClient().createJob(hdfsLink.getPersistenceId(), rdbmsLink.getPersistenceId()); + fillHdfsFromConfig(aws2db); + fillRdbmsToConfig(aws2db); + + saveJob(aws2db); + executeJob(aws2db); + + // Final verification + assertEquals(4L, provider.rowCount(getTableName())); + assertRowInCities(1, "USA", "2004-10-23", "San Francisco"); + assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale"); + assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno"); + assertRowInCities(4, "USA", "2004-10-26", "Palo Alto"); + } + + /** + * Skip this test if given value is null + */ + void assumeNotNull(String value, String key) { + if(value == null) { + throw new SkipException("Missing value for " + key); + } + } + +}
