Repository: sqoop Updated Branches: refs/heads/sqoop2 24feea185 -> a63da71ad
SQOOP-1949: Sqoop2: HDFS append only support (Jarek Jarcec Cecho via Abraham Elmahrek) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/a63da71a Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/a63da71a Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/a63da71a Branch: refs/heads/sqoop2 Commit: a63da71adfe3e26b0d3a0c2d162782f835edf0da Parents: 24feea1 Author: Abraham Elmahrek <[email protected]> Authored: Wed Mar 18 23:43:00 2015 -0700 Committer: Abraham Elmahrek <[email protected]> Committed: Wed Mar 18 23:43:00 2015 -0700 ---------------------------------------------------------------------- .../sqoop/connector/hdfs/HdfsToInitializer.java | 3 +- .../hdfs/configuration/ToJobConfig.java | 2 + .../resources/hdfs-connector-config.properties | 4 +- .../sqoop/connector/hdfs/TestToInitializer.java | 21 +++++ .../apache/sqoop/test/asserts/HdfsAsserts.java | 4 +- .../connector/hdfs/AppendModeTest.java | 87 ++++++++++++++++++++ 6 files changed, 117 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/a63da71a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java index 05ceb23..234bb71 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java @@ -48,6 +48,7 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfi Configuration configuration = HdfsUtils.createConfiguration(linkConfig); HdfsUtils.configurationToContext(configuration, context.getContext()); + boolean appendMode = Boolean.TRUE.equals(jobConfig.toJobConfig.appendMode); // Verification that given HDFS directory either don't exists or is empty try { @@ -59,7 +60,7 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfi throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output directory already exists and is a file"); } - if(fs.isDirectory(path)) { + if(fs.isDirectory(path) && !appendMode) { FileStatus[] fileStatuses = fs.listStatus(path); if(fileStatuses.length != 0) { throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output directory is not empty"); http://git-wip-us.apache.org/repos/asf/sqoop/blob/a63da71a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java index 6fc894b..d76ba5f 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java @@ -41,6 +41,8 @@ public class ToJobConfig { @Input(size = 255, validators = { @Validator(NotEmpty.class)}) public String outputDirectory; + @Input public Boolean appendMode; + public static class ToJobConfigValidator extends AbstractValidator<ToJobConfig> { @Override public void validate(ToJobConfig conf) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/a63da71a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties index 8d5a562..eb9c000 100644 --- a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties +++ b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties @@ -48,8 +48,8 @@ toJobConfig.customCompression.help = Full class name of the custom compression toJobConfig.outputDirectory.label = Output directory toJobConfig.outputDirectory.help = Output directory for final data -toJobConfig.ignored.label = Ignored -toJobConfig.ignored.help = This value is ignored +toJobConfig.appendMode.label = Append mode +toJobConfig.appendMode.help = Append new files to existing directory if the output directory already exists toJobConfig.overrideNullValue.label = Override null value toJobConfig.overrideNullValue.help = If set to true, then the null value will \ http://git-wip-us.apache.org/repos/asf/sqoop/blob/a63da71a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java index aa267a7..a98a46a 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java @@ -88,4 +88,25 @@ public class TestToInitializer extends TestHdfsBase { Initializer initializer = new HdfsToInitializer(); initializer.initialize(initializerContext, linkConfig, jobConfig); } + + @Test + public void testOutputDirectoryIsNotEmptyWithIncremental() throws Exception { + File dir = Files.createTempDir(); + File file = File.createTempFile("MastersOfOrion", ".txt", dir); + + LinkConfiguration linkConfig = new LinkConfiguration(); + ToJobConfiguration jobConfig = new ToJobConfiguration(); + + linkConfig.linkConfig.uri = "file:///"; + jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath(); + jobConfig.toJobConfig.appendMode = true; + + InitializerContext initializerContext = new InitializerContext(new MutableMapContext()); + + Initializer initializer = new HdfsToInitializer(); + initializer.initialize(initializerContext, linkConfig, jobConfig); + + assertNotNull(initializerContext.getString(HdfsConstants.WORK_DIRECTORY)); + assertTrue(initializerContext.getString(HdfsConstants.WORK_DIRECTORY).startsWith(dir.getAbsolutePath())); + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/a63da71a/test/src/main/java/org/apache/sqoop/test/asserts/HdfsAsserts.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/sqoop/test/asserts/HdfsAsserts.java b/test/src/main/java/org/apache/sqoop/test/asserts/HdfsAsserts.java index b115723..8d548ad 100644 --- a/test/src/main/java/org/apache/sqoop/test/asserts/HdfsAsserts.java +++ b/test/src/main/java/org/apache/sqoop/test/asserts/HdfsAsserts.java @@ -17,6 +17,8 @@ */ package org.apache.sqoop.test.asserts; +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -50,7 +52,7 @@ public class HdfsAsserts { * @throws IOException */ public static void assertMapreduceOutput(FileSystem fs, String directory, String... lines) throws IOException { - Set<String> setLines = new HashSet<String>(Arrays.asList(lines)); + Multiset<String> setLines = HashMultiset.create(Arrays.asList(lines)); List<String> notFound = new LinkedList<String>(); Path[] files = HdfsUtils.getOutputMapreduceFiles(fs, directory); http://git-wip-us.apache.org/repos/asf/sqoop/blob/a63da71a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/AppendModeTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/AppendModeTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/AppendModeTest.java new file mode 100644 index 0000000..1ba3bd4 --- /dev/null +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/AppendModeTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.integration.connector.hdfs; + +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.connector.hdfs.configuration.ToFormat; +import org.apache.sqoop.model.MConfigList; +import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MLink; +import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.testng.annotations.Test; + +/** + */ +public class AppendModeTest extends ConnectorTestCase { + + @Test + public void test() throws Exception { + createAndLoadTableCities(); + + // RDBMS link + MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector"); + fillRdbmsLinkConfig(rdbmsConnection); + saveLink(rdbmsConnection); + + // HDFS link + MLink hdfsConnection = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsConnection); + saveLink(hdfsConnection); + + // Job creation + MJob job = getClient().createJob(rdbmsConnection.getPersistenceId(), hdfsConnection.getPersistenceId()); + + // Set rdbms "FROM" config + MConfigList fromConfig = job.getJobConfig(Direction.FROM); + fromConfig.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName())); + fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id")); + + // fill the hdfs "TO" config + fillHdfsToConfig(job, ToFormat.TEXT_FILE); + MConfigList toConfig = job.getJobConfig(Direction.TO); + toConfig.getBooleanInput("toJobConfig.appendMode").setValue(true); + + + saveJob(job); + + // First execution + executeJob(job); + assertTo( + "1,'USA','2004-10-23','San Francisco'", + "2,'USA','2004-10-24','Sunnyvale'", + "3,'Czech Republic','2004-10-25','Brno'", + "4,'USA','2004-10-26','Palo Alto'" + ); + + // Second execution + executeJob(job); + assertTo( + "1,'USA','2004-10-23','San Francisco'", + "2,'USA','2004-10-24','Sunnyvale'", + "3,'Czech Republic','2004-10-25','Brno'", + "4,'USA','2004-10-26','Palo Alto'", + "1,'USA','2004-10-23','San Francisco'", + "2,'USA','2004-10-24','Sunnyvale'", + "3,'Czech Republic','2004-10-25','Brno'", + "4,'USA','2004-10-26','Palo Alto'" + ); + + dropTable(); + } + +}
