Repository: sqoop Updated Branches: refs/heads/sqoop2 208d5daf4 -> bd7252480
SQOOP-2525. Sqoop2: Add support for incremental From in HDFS Connector (Jarcec via Hari) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/bd725248 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/bd725248 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/bd725248 Branch: refs/heads/sqoop2 Commit: bd7252480f32c36002fba187e62f7c0dbe284c38 Parents: 208d5da Author: Hari Shreedharan <[email protected]> Authored: Fri Sep 18 12:52:25 2015 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Fri Sep 18 12:52:25 2015 -0700 ---------------------------------------------------------------------- .../sqoop/error/code/HdfsConnectorError.java | 2 +- .../sqoop/connector/hdfs/HdfsConstants.java | 2 + .../sqoop/connector/hdfs/HdfsFromDestroyer.java | 17 +++- .../connector/hdfs/HdfsFromInitializer.java | 48 ++++++++++ .../sqoop/connector/hdfs/HdfsPartitioner.java | 25 +++++- .../configuration/FromJobConfiguration.java | 6 +- .../hdfs/configuration/IncrementalRead.java | 42 +++++++++ .../hdfs/configuration/IncrementalType.java | 23 +++++ .../resources/hdfs-connector-config.properties | 9 ++ .../sqoop/connector/hdfs/TestFromDestroyer.java | 52 +++++++++++ .../connector/hdfs/TestFromInitializer.java | 59 +++++++++++-- .../connector/hdfs/HdfsIncrementalReadTest.java | 93 ++++++++++++++++++++ 12 files changed, 366 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java b/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java index 6cd66cc..2bf7f4e 100644 --- a/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java +++ b/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java @@ -34,7 +34,7 @@ public enum HdfsConnectorError implements ErrorCode{ GENERIC_HDFS_CONNECTOR_0005("Error occurs during loader run"), GENERIC_HDFS_CONNECTOR_0006("Unknown job type"), - GENERIC_HDFS_CONNECTOR_0007("Invalid output directory"), + GENERIC_HDFS_CONNECTOR_0007("Invalid input/output directory"), GENERIC_HDFS_CONNECTOR_0008("Error occurs during destroyer run"), http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java index 9d20a79..39ee4a3 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java @@ -33,4 +33,6 @@ public final class HdfsConstants extends Constants { public static final String PREFIX = "org.apache.sqoop.connector.hdfs."; public static final String WORK_DIRECTORY = PREFIX + "work_dir"; + + public static final String MAX_IMPORT_DATE = PREFIX + "max_import_date"; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java index 6d79db7..9f84b82 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java @@ -17,12 +17,17 @@ */ package org.apache.sqoop.connector.hdfs; +import org.apache.log4j.Logger; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.job.etl.Destroyer; import org.apache.sqoop.job.etl.DestroyerContext; +import org.joda.time.DateTime; public class HdfsFromDestroyer extends Destroyer<LinkConfiguration, FromJobConfiguration> { + + private static final Logger LOG = Logger.getLogger(HdfsFromDestroyer.class); + /** * Callback to clean up after job execution. * @@ -31,8 +36,14 @@ public class HdfsFromDestroyer extends Destroyer<LinkConfiguration, FromJobConfi * @param jobConfig FROM job configuration object */ @Override - public void destroy(DestroyerContext context, LinkConfiguration linkConfig, - FromJobConfiguration jobConfig) { - // do nothing at this point + public void destroy(DestroyerContext context, LinkConfiguration linkConfig, FromJobConfiguration jobConfig) { + LOG.info("Running HDFS connector destroyer"); + } + + @Override + public void updateConfiguration(DestroyerContext context, LinkConfiguration linkConfiguration, FromJobConfiguration jobConfiguration) { + LOG.info("Updating HDFS connector options"); + long epoch = context.getLong(HdfsConstants.MAX_IMPORT_DATE, -1); + jobConfiguration.incremental.lastImportedDate = epoch == -1 ? null : new DateTime(epoch); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java index f5d9e1f..6c943a8 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java @@ -18,13 +18,25 @@ package org.apache.sqoop.connector.hdfs; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.IncrementalType; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; +import org.apache.log4j.Logger; + +import java.io.IOException; public class HdfsFromInitializer extends Initializer<LinkConfiguration, FromJobConfiguration> { + + public static final Logger LOG = Logger.getLogger(HdfsFromInitializer.class); + /** * Initialize new submission based on given configuration properties. Any * needed temporary values might be saved to context object and they will be @@ -36,8 +48,44 @@ public class HdfsFromInitializer extends Initializer<LinkConfiguration, FromJobC */ @Override public void initialize(InitializerContext context, LinkConfiguration linkConfig, FromJobConfiguration jobConfig) { + assert jobConfig.incremental != null; + Configuration configuration = HdfsUtils.createConfiguration(linkConfig); HdfsUtils.configurationToContext(configuration, context.getContext()); context.getContext().setAll(linkConfig.linkConfig.configOverrides); + + boolean incremental = jobConfig.incremental.incrementalType != null && jobConfig.incremental.incrementalType == IncrementalType.NEW_FILES; + + // In case of incremental import, we need to persist the highest last modified + try { + FileSystem fs = FileSystem.get(configuration); + Path path = new Path(jobConfig.fromJobConfig.inputDirectory); + LOG.info("Input directory: " + path.toString()); + + if(!fs.exists(path)) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Input directory doesn't exists"); + } + + if(fs.isFile(path)) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Input directory is a file"); + } + + if(incremental) { + LOG.info("Detected incremental import"); + long maxModifiedTime = -1; + FileStatus[] fileStatuses = fs.listStatus(path); + for(FileStatus status : fileStatuses) { + if(maxModifiedTime < status.getModificationTime()) { + maxModifiedTime = status.getModificationTime(); + } + } + + LOG.info("Maximal age of file is: " + maxModifiedTime); + context.getContext().setLong(HdfsConstants.MAX_IMPORT_DATE, maxModifiedTime); + } + } catch (IOException e) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Unexpected exception", e); + } + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java index 119955d..ff16ad7 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java @@ -38,8 +38,10 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NodeBase; +import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.IncrementalType; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.job.etl.Partition; @@ -52,6 +54,8 @@ import org.apache.sqoop.job.etl.PartitionerContext; */ public class HdfsPartitioner extends Partitioner<LinkConfiguration, FromJobConfiguration> { + public static final Logger LOG = Logger.getLogger(HdfsPartitioner.class); + public static final String SPLIT_MINSIZE_PERNODE = "mapreduce.input.fileinputformat.split.minsize.per.node"; public static final String SPLIT_MINSIZE_PERRACK = @@ -70,6 +74,8 @@ public class HdfsPartitioner extends Partitioner<LinkConfiguration, FromJobConfi public List<Partition> getPartitions(PartitionerContext context, LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfig) { + assert fromJobConfig.incremental != null; + Configuration conf = new Configuration(); HdfsUtils.contextToConfiguration(context.getContext(), conf); @@ -118,6 +124,11 @@ public class HdfsPartitioner extends Partitioner<LinkConfiguration, FromJobConfi "size per rack " + minSizeRack); } + // Incremental import related options + boolean incremental = fromJobConfig.incremental.incrementalType != null && fromJobConfig.incremental.incrementalType == IncrementalType.NEW_FILES; + long lastImportedDate = fromJobConfig.incremental.lastImportedDate != null ? fromJobConfig.incremental.lastImportedDate.getMillis() : -1; + long maxImportDate = context.getLong(HdfsConstants.MAX_IMPORT_DATE, -1); + // all the files in input set String indir = fromJobConfig.fromJobConfig.inputDirectory; FileSystem fs = FileSystem.get(conf); @@ -125,7 +136,19 @@ public class HdfsPartitioner extends Partitioner<LinkConfiguration, FromJobConfi List<Path> paths = new LinkedList<Path>(); for(FileStatus status : fs.listStatus(new Path(indir))) { if(!status.isDir()) { - paths.add(status.getPath()); + if(incremental) { + long modifiedDate = status.getModificationTime(); + if(lastImportedDate < modifiedDate && modifiedDate <= maxImportDate) { + LOG.info("Will process input file: " + status.getPath() + " with modification date " + modifiedDate); + paths.add(status.getPath()); + } else { + LOG.info("Skipping input file: " + status.getPath() + " with modification date " + modifiedDate); + } + } else { + // Without incremental mode, we're processing all files + LOG.info("Will process input file: " + status.getPath()); + paths.add(status.getPath()); + } } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java index 618366e..fdef4b4 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java @@ -22,10 +22,14 @@ import org.apache.sqoop.model.Config; @ConfigurationClass public class FromJobConfiguration { + @Config public FromJobConfig fromJobConfig; + @Config public IncrementalRead incremental; + public FromJobConfiguration() { fromJobConfig = new FromJobConfig(); - + incremental = new IncrementalRead(); } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/IncrementalRead.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/IncrementalRead.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/IncrementalRead.java new file mode 100644 index 0000000..23a7b2f --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/IncrementalRead.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.configuration; + +import org.apache.sqoop.model.ConfigClass; +import org.apache.sqoop.model.Input; +import org.apache.sqoop.validation.Status; +import org.apache.sqoop.validation.validators.AbstractValidator; +import org.joda.time.DateTime; + +@ConfigClass +public class IncrementalRead { + @Input + public IncrementalType incrementalType; + + @Input + public DateTime lastImportedDate; + + public static class ConfigValidator extends AbstractValidator<IncrementalRead> { + @Override + public void validate(IncrementalRead conf) { + if(conf.incrementalType != IncrementalType.NEW_FILES && conf.lastImportedDate != null) { + addMessage(Status.ERROR, "Can't specify last imported date without enabling incremental import."); + } + } + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/IncrementalType.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/IncrementalType.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/IncrementalType.java new file mode 100644 index 0000000..9e2a7d5 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/IncrementalType.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.configuration; + +public enum IncrementalType { + NONE, + NEW_FILES, IncrementalType, +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties index db23a95..69f50c1 100644 --- a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties +++ b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties @@ -63,6 +63,15 @@ toJobConfig.nullValue.label = Null value toJobConfig.nullValue.help = Use this particular character or sequence of characters \ as a value representing null when outputting to a file. +incremental.label = Incremental import +incremental.help = Information relevant for incremental import from HDFS + +incremental.incrementalType.label = Incremental type +incremental.incrementalType.help = Type of incremental import + +incremental.lastImportedDate.label = Last imported date +incremental.lastImportedDate.help = Date when last import happened + # From Job Config # fromJobConfig.label = From HDFS configuration http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromDestroyer.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromDestroyer.java new file mode 100644 index 0000000..569c60b --- /dev/null +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromDestroyer.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.sqoop.common.MutableContext; +import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.job.etl.DestroyerContext; +import org.testng.annotations.Test; +import org.joda.time.DateTime; + +import static org.testng.AssertJUnit.assertEquals; + +public class TestFromDestroyer { + + Destroyer<LinkConfiguration, FromJobConfiguration> destroyer; + LinkConfiguration linkConfig; + FromJobConfiguration jobConfig; + MutableContext context; + + public TestFromDestroyer() { + linkConfig = new LinkConfiguration(); + jobConfig = new FromJobConfiguration(); + context = new MutableMapContext(); + destroyer = new HdfsFromDestroyer(); + } + + @Test + public void testUpdateConfiguration() { + DateTime dt = new DateTime(); + context.setLong(HdfsConstants.MAX_IMPORT_DATE, dt.getMillis()); + destroyer.updateConfiguration(new DestroyerContext(context, true, null), linkConfig, jobConfig); + assertEquals(jobConfig.incremental.lastImportedDate, dt); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromInitializer.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromInitializer.java index 5215901..52c174e 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromInitializer.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromInitializer.java @@ -18,30 +18,77 @@ */ package org.apache.sqoop.connector.hdfs; +import com.google.common.io.Files; +import org.apache.sqoop.common.MutableContext; import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.IncrementalType; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; import org.testng.annotations.Test; +import java.io.File; + import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; public class TestFromInitializer { + Initializer<LinkConfiguration, FromJobConfiguration> initializer; + InitializerContext initializerContext; + LinkConfiguration linkConfig; + FromJobConfiguration jobConfig; + MutableContext context; + + public TestFromInitializer() { + linkConfig = new LinkConfiguration(); + jobConfig = new FromJobConfiguration(); + context = new MutableMapContext(); + initializer = new HdfsFromInitializer(); + initializerContext = new InitializerContext(context); + } + @Test public void testConfigOverrides() { - LinkConfiguration linkConfig = new LinkConfiguration(); - FromJobConfiguration jobConfig = new FromJobConfiguration(); - linkConfig.linkConfig.uri = "file:///"; linkConfig.linkConfig.configOverrides.put("key", "value"); + jobConfig.fromJobConfig.inputDirectory = "/tmp"; - InitializerContext initializerContext = new InitializerContext(new MutableMapContext()); - - Initializer initializer = new HdfsFromInitializer(); initializer.initialize(initializerContext, linkConfig, jobConfig); assertEquals(initializerContext.getString("key"), "value"); } + + @Test(expectedExceptions = SqoopException.class) + public void testFailIfInputDirectoryDoNotExists() { + jobConfig.fromJobConfig.inputDirectory = "/tmp/this/directory/definitely/do/not/exists"; + initializer.initialize(initializerContext, linkConfig, jobConfig); + } + + @Test(expectedExceptions = SqoopException.class) + public void testFailIfInputDirectoryIsFile() throws Exception { + File workDir = Files.createTempDir(); + File inputFile = File.createTempFile("part-01-", ".txt", workDir); + inputFile.createNewFile(); + + jobConfig.fromJobConfig.inputDirectory = inputFile.getAbsolutePath(); + initializer.initialize(initializerContext, linkConfig, jobConfig); + } + + @Test + public void testIncremental() throws Exception { + File workDir = Files.createTempDir(); + File.createTempFile("part-01-", ".txt", workDir).createNewFile(); + File.createTempFile("part-02-", ".txt", workDir).createNewFile(); + + jobConfig.fromJobConfig.inputDirectory = workDir.getAbsolutePath(); + jobConfig.incremental.incrementalType = IncrementalType.NEW_FILES; + initializer.initialize(initializerContext, linkConfig, jobConfig); + + // Max import date must be defined if we are running incremental + assertNotNull(context.getString(HdfsConstants.MAX_IMPORT_DATE)); + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/HdfsIncrementalReadTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/HdfsIncrementalReadTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/HdfsIncrementalReadTest.java new file mode 100644 index 0000000..a32a563 --- /dev/null +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/HdfsIncrementalReadTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.integration.connector.hdfs; + +import org.apache.sqoop.connector.hdfs.configuration.IncrementalType; +import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MLink; +import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + +public class HdfsIncrementalReadTest extends ConnectorTestCase { + + @BeforeMethod(alwaysRun = true) + public void createTable() { + createTableCities(); + } + + @AfterMethod(alwaysRun = true) + public void dropTable() { + super.dropTable(); + } + + @Test + public void testBasic() throws Exception { + createFromFile("input-0001", + "1,'USA','2004-10-23','San Francisco'" + ); + + // RDBMS link + MLink rdbmsLink = getClient().createLink("generic-jdbc-connector"); + fillRdbmsLinkConfig(rdbmsLink); + saveLink(rdbmsLink); + + // HDFS link + MLink hdfsLink = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsLink); + saveLink(hdfsLink); + + // Job creation + MJob job = getClient().createJob(hdfsLink.getPersistenceId(), rdbmsLink.getPersistenceId()); + fillHdfsFromConfig(job); + job.getFromJobConfig().getEnumInput("incremental.incrementalType").setValue(IncrementalType.NEW_FILES); + fillRdbmsToConfig(job); + saveJob(job); + + // Execute for the first time + executeJob(job); + assertEquals(provider.rowCount(getTableName()), 1); + assertRowInCities(1, "USA", "2004-10-23", "San Francisco"); + + // Second execution + createFromFile("input-0002", + "2,'USA','2004-10-24','Sunnyvale'", + "3,'Czech Republic','2004-10-25','Brno'" + ); + executeJob(job); + assertEquals(provider.rowCount(getTableName()), 3); + assertRowInCities(1, "USA", "2004-10-23", "San Francisco"); + assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale"); + assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno"); + + // And last execution + createFromFile("input-0003", + "4,'USA','2004-10-26','Palo Alto'" + ); + executeJob(job); + assertEquals(provider.rowCount(getTableName()), 4); + assertRowInCities(1, "USA", "2004-10-23", "San Francisco"); + assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale"); + assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno"); + assertRowInCities(4, "USA", "2004-10-26", "Palo Alto"); + } + +}
