Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 208d5daf4 -> bd7252480


SQOOP-2525. Sqoop2: Add support for incremental From in HDFS Connector

(Jarcec via Hari)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/bd725248
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/bd725248
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/bd725248

Branch: refs/heads/sqoop2
Commit: bd7252480f32c36002fba187e62f7c0dbe284c38
Parents: 208d5da
Author: Hari Shreedharan <[email protected]>
Authored: Fri Sep 18 12:52:25 2015 -0700
Committer: Hari Shreedharan <[email protected]>
Committed: Fri Sep 18 12:52:25 2015 -0700

----------------------------------------------------------------------
 .../sqoop/error/code/HdfsConnectorError.java    |  2 +-
 .../sqoop/connector/hdfs/HdfsConstants.java     |  2 +
 .../sqoop/connector/hdfs/HdfsFromDestroyer.java | 17 +++-
 .../connector/hdfs/HdfsFromInitializer.java     | 48 ++++++++++
 .../sqoop/connector/hdfs/HdfsPartitioner.java   | 25 +++++-
 .../configuration/FromJobConfiguration.java     |  6 +-
 .../hdfs/configuration/IncrementalRead.java     | 42 +++++++++
 .../hdfs/configuration/IncrementalType.java     | 23 +++++
 .../resources/hdfs-connector-config.properties  |  9 ++
 .../sqoop/connector/hdfs/TestFromDestroyer.java | 52 +++++++++++
 .../connector/hdfs/TestFromInitializer.java     | 59 +++++++++++--
 .../connector/hdfs/HdfsIncrementalReadTest.java | 93 ++++++++++++++++++++
 12 files changed, 366 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java 
b/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java
index 6cd66cc..2bf7f4e 100644
--- a/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java
+++ b/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java
@@ -34,7 +34,7 @@ public enum HdfsConnectorError implements ErrorCode{
   GENERIC_HDFS_CONNECTOR_0005("Error occurs during loader run"),
   GENERIC_HDFS_CONNECTOR_0006("Unknown job type"),
 
-  GENERIC_HDFS_CONNECTOR_0007("Invalid output directory"),
+  GENERIC_HDFS_CONNECTOR_0007("Invalid input/output directory"),
 
   GENERIC_HDFS_CONNECTOR_0008("Error occurs during destroyer run"),
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
index 9d20a79..39ee4a3 100644
--- 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
+++ 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
@@ -33,4 +33,6 @@ public final class HdfsConstants extends Constants {
   public static final String PREFIX = "org.apache.sqoop.connector.hdfs.";
 
   public static final String WORK_DIRECTORY = PREFIX + "work_dir";
+
+  public static final String MAX_IMPORT_DATE = PREFIX + "max_import_date";
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java
 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java
index 6d79db7..9f84b82 100644
--- 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java
+++ 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java
@@ -17,12 +17,17 @@
  */
 package org.apache.sqoop.connector.hdfs;
 
+import org.apache.log4j.Logger;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.job.etl.Destroyer;
 import org.apache.sqoop.job.etl.DestroyerContext;
+import org.joda.time.DateTime;
 
 public class HdfsFromDestroyer extends Destroyer<LinkConfiguration, 
FromJobConfiguration> {
+
+  private static final Logger LOG = Logger.getLogger(HdfsFromDestroyer.class);
+
   /**
    * Callback to clean up after job execution.
    *
@@ -31,8 +36,14 @@ public class HdfsFromDestroyer extends 
Destroyer<LinkConfiguration, FromJobConfi
    * @param jobConfig FROM job configuration object
    */
   @Override
-  public void destroy(DestroyerContext context, LinkConfiguration linkConfig,
-      FromJobConfiguration jobConfig) {
-    // do nothing at this point
+  public void destroy(DestroyerContext context, LinkConfiguration linkConfig, 
FromJobConfiguration jobConfig) {
+    LOG.info("Running HDFS connector destroyer");
+  }
+
+  @Override
+  public void updateConfiguration(DestroyerContext context, LinkConfiguration 
linkConfiguration, FromJobConfiguration jobConfiguration) {
+    LOG.info("Updating HDFS connector options");
+    long epoch = context.getLong(HdfsConstants.MAX_IMPORT_DATE, -1);
+    jobConfiguration.incremental.lastImportedDate = epoch == -1 ? null : new 
DateTime(epoch);
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
index f5d9e1f..6c943a8 100644
--- 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
+++ 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
@@ -18,13 +18,25 @@
 package org.apache.sqoop.connector.hdfs;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.error.code.HdfsConnectorError;
 import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.job.etl.InitializerContext;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
 
 
 public class HdfsFromInitializer extends Initializer<LinkConfiguration, 
FromJobConfiguration> {
+
+  public static final Logger LOG = Logger.getLogger(HdfsFromInitializer.class);
+
   /**
    * Initialize new submission based on given configuration properties. Any
    * needed temporary values might be saved to context object and they will be
@@ -36,8 +48,44 @@ public class HdfsFromInitializer extends 
Initializer<LinkConfiguration, FromJobC
    */
   @Override
   public void initialize(InitializerContext context, LinkConfiguration 
linkConfig, FromJobConfiguration jobConfig) {
+    assert jobConfig.incremental != null;
+
     Configuration configuration = HdfsUtils.createConfiguration(linkConfig);
     HdfsUtils.configurationToContext(configuration, context.getContext());
     context.getContext().setAll(linkConfig.linkConfig.configOverrides);
+
+    boolean incremental = jobConfig.incremental.incrementalType != null && 
jobConfig.incremental.incrementalType == IncrementalType.NEW_FILES;
+
+    // In case of incremental import, we need to persist the highest last 
modified
+    try {
+      FileSystem fs = FileSystem.get(configuration);
+      Path path = new Path(jobConfig.fromJobConfig.inputDirectory);
+      LOG.info("Input directory: " + path.toString());
+
+      if(!fs.exists(path)) {
+        throw new 
SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Input directory 
doesn't exists");
+      }
+
+      if(fs.isFile(path)) {
+        throw new 
SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Input directory 
is a file");
+      }
+
+      if(incremental) {
+        LOG.info("Detected incremental import");
+        long maxModifiedTime = -1;
+        FileStatus[] fileStatuses = fs.listStatus(path);
+        for(FileStatus status : fileStatuses) {
+          if(maxModifiedTime < status.getModificationTime()) {
+            maxModifiedTime = status.getModificationTime();
+          }
+        }
+
+        LOG.info("Maximal age of file is: " + maxModifiedTime);
+        context.getContext().setLong(HdfsConstants.MAX_IMPORT_DATE, 
maxModifiedTime);
+      }
+    } catch (IOException e) {
+      throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, 
"Unexpected exception", e);
+    }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
index 119955d..ff16ad7 100644
--- 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
+++ 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
@@ -38,8 +38,10 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NodeBase;
+import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.error.code.HdfsConnectorError;
 import org.apache.sqoop.job.etl.Partition;
@@ -52,6 +54,8 @@ import org.apache.sqoop.job.etl.PartitionerContext;
  */
 public class HdfsPartitioner extends Partitioner<LinkConfiguration, 
FromJobConfiguration> {
 
+  public static final Logger LOG = Logger.getLogger(HdfsPartitioner.class);
+
   public static final String SPLIT_MINSIZE_PERNODE =
       "mapreduce.input.fileinputformat.split.minsize.per.node";
   public static final String SPLIT_MINSIZE_PERRACK =
@@ -70,6 +74,8 @@ public class HdfsPartitioner extends 
Partitioner<LinkConfiguration, FromJobConfi
   public List<Partition> getPartitions(PartitionerContext context,
                                        LinkConfiguration linkConfiguration,
                                        FromJobConfiguration fromJobConfig) {
+    assert fromJobConfig.incremental != null;
+
     Configuration conf = new Configuration();
     HdfsUtils.contextToConfiguration(context.getContext(), conf);
 
@@ -118,6 +124,11 @@ public class HdfsPartitioner extends 
Partitioner<LinkConfiguration, FromJobConfi
                               "size per rack " + minSizeRack);
       }
 
+      // Incremental import related options
+      boolean incremental = fromJobConfig.incremental.incrementalType != null 
&& fromJobConfig.incremental.incrementalType == IncrementalType.NEW_FILES;
+      long lastImportedDate = fromJobConfig.incremental.lastImportedDate != 
null ? fromJobConfig.incremental.lastImportedDate.getMillis() : -1;
+      long maxImportDate = context.getLong(HdfsConstants.MAX_IMPORT_DATE, -1);
+
       // all the files in input set
       String indir = fromJobConfig.fromJobConfig.inputDirectory;
       FileSystem fs = FileSystem.get(conf);
@@ -125,7 +136,19 @@ public class HdfsPartitioner extends 
Partitioner<LinkConfiguration, FromJobConfi
       List<Path> paths = new LinkedList<Path>();
       for(FileStatus status : fs.listStatus(new Path(indir))) {
         if(!status.isDir()) {
-          paths.add(status.getPath());
+          if(incremental) {
+            long modifiedDate = status.getModificationTime();
+            if(lastImportedDate < modifiedDate && modifiedDate <= 
maxImportDate) {
+              LOG.info("Will process input file: " + status.getPath() + " with 
modification date " + modifiedDate);
+              paths.add(status.getPath());
+            } else {
+              LOG.info("Skipping input file: " + status.getPath() + " with 
modification date " + modifiedDate);
+            }
+          } else {
+            // Without incremental mode, we're processing all files
+            LOG.info("Will process input file: " + status.getPath());
+            paths.add(status.getPath());
+          }
         }
       }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java
 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java
index 618366e..fdef4b4 100644
--- 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java
+++ 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java
@@ -22,10 +22,14 @@ import org.apache.sqoop.model.Config;
 
 @ConfigurationClass
 public class FromJobConfiguration {
+
   @Config public FromJobConfig fromJobConfig;
 
+  @Config public IncrementalRead incremental;
+
   public FromJobConfiguration() {
     fromJobConfig = new FromJobConfig();
-
+    incremental = new IncrementalRead();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/IncrementalRead.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/IncrementalRead.java
 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/IncrementalRead.java
new file mode 100644
index 0000000..23a7b2f
--- /dev/null
+++ 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/IncrementalRead.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.configuration;
+
+import org.apache.sqoop.model.ConfigClass;
+import org.apache.sqoop.model.Input;
+import org.apache.sqoop.validation.Status;
+import org.apache.sqoop.validation.validators.AbstractValidator;
+import org.joda.time.DateTime;
+
+@ConfigClass
+public class IncrementalRead {
+  @Input
+  public IncrementalType incrementalType;
+
+  @Input
+  public DateTime lastImportedDate;
+
+  public static class ConfigValidator extends 
AbstractValidator<IncrementalRead> {
+    @Override
+    public void validate(IncrementalRead conf) {
+      if(conf.incrementalType != IncrementalType.NEW_FILES && 
conf.lastImportedDate != null) {
+        addMessage(Status.ERROR, "Can't specify last imported date without 
enabling incremental import.");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/IncrementalType.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/IncrementalType.java
 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/IncrementalType.java
new file mode 100644
index 0000000..9e2a7d5
--- /dev/null
+++ 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/IncrementalType.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.configuration;
+
+public enum IncrementalType {
+  NONE,
+  NEW_FILES, IncrementalType,
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties 
b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
index db23a95..69f50c1 100644
--- 
a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
+++ 
b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
@@ -63,6 +63,15 @@ toJobConfig.nullValue.label = Null value
 toJobConfig.nullValue.help = Use this particular character or sequence of 
characters \
                              as a value representing null when outputting to a 
file.
 
+incremental.label = Incremental import
+incremental.help = Information relevant for incremental import from HDFS
+
+incremental.incrementalType.label = Incremental type
+incremental.incrementalType.help = Type of incremental import
+
+incremental.lastImportedDate.label = Last imported date
+incremental.lastImportedDate.help = Date when last import happened
+
 # From Job Config
 #
 fromJobConfig.label = From HDFS configuration

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromDestroyer.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromDestroyer.java
 
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromDestroyer.java
new file mode 100644
index 0000000..569c60b
--- /dev/null
+++ 
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromDestroyer.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.common.MutableContext;
+import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+import org.testng.annotations.Test;
+import org.joda.time.DateTime;
+
+import static org.testng.AssertJUnit.assertEquals;
+
+public class TestFromDestroyer {
+
+  Destroyer<LinkConfiguration, FromJobConfiguration> destroyer;
+  LinkConfiguration linkConfig;
+  FromJobConfiguration jobConfig;
+  MutableContext context;
+
+  public TestFromDestroyer() {
+    linkConfig = new LinkConfiguration();
+    jobConfig = new FromJobConfiguration();
+    context = new MutableMapContext();
+    destroyer = new HdfsFromDestroyer();
+  }
+
+  @Test
+  public void testUpdateConfiguration() {
+    DateTime dt = new DateTime();
+    context.setLong(HdfsConstants.MAX_IMPORT_DATE, dt.getMillis());
+    destroyer.updateConfiguration(new DestroyerContext(context, true, null), 
linkConfig, jobConfig);
+    assertEquals(jobConfig.incremental.lastImportedDate, dt);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromInitializer.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromInitializer.java
 
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromInitializer.java
index 5215901..52c174e 100644
--- 
a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromInitializer.java
+++ 
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromInitializer.java
@@ -18,30 +18,77 @@
  */
 package org.apache.sqoop.connector.hdfs;
 
+import com.google.common.io.Files;
+import org.apache.sqoop.common.MutableContext;
 import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.job.etl.InitializerContext;
 import org.testng.annotations.Test;
 
+import java.io.File;
+
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
 public class TestFromInitializer {
 
+  Initializer<LinkConfiguration, FromJobConfiguration> initializer;
+  InitializerContext initializerContext;
+  LinkConfiguration linkConfig;
+  FromJobConfiguration jobConfig;
+  MutableContext context;
+
+  public TestFromInitializer() {
+    linkConfig = new LinkConfiguration();
+    jobConfig = new FromJobConfiguration();
+    context = new MutableMapContext();
+    initializer = new HdfsFromInitializer();
+    initializerContext = new InitializerContext(context);
+  }
+
   @Test
   public void testConfigOverrides() {
-    LinkConfiguration linkConfig = new LinkConfiguration();
-    FromJobConfiguration jobConfig = new FromJobConfiguration();
-
     linkConfig.linkConfig.uri = "file:///";
     linkConfig.linkConfig.configOverrides.put("key", "value");
+    jobConfig.fromJobConfig.inputDirectory = "/tmp";
 
-    InitializerContext initializerContext = new InitializerContext(new 
MutableMapContext());
-
-    Initializer initializer = new HdfsFromInitializer();
     initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     assertEquals(initializerContext.getString("key"), "value");
   }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testFailIfInputDirectoryDoNotExists() {
+    jobConfig.fromJobConfig.inputDirectory = 
"/tmp/this/directory/definitely/do/not/exists";
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testFailIfInputDirectoryIsFile() throws Exception {
+    File workDir = Files.createTempDir();
+    File inputFile = File.createTempFile("part-01-", ".txt", workDir);
+    inputFile.createNewFile();
+
+    jobConfig.fromJobConfig.inputDirectory = inputFile.getAbsolutePath();
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
+  }
+
+  @Test
+  public void testIncremental() throws Exception {
+    File workDir = Files.createTempDir();
+    File.createTempFile("part-01-", ".txt", workDir).createNewFile();
+    File.createTempFile("part-02-", ".txt", workDir).createNewFile();
+
+    jobConfig.fromJobConfig.inputDirectory = workDir.getAbsolutePath();
+    jobConfig.incremental.incrementalType = IncrementalType.NEW_FILES;
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
+
+    // Max import date must be defined if we are running incremental
+    assertNotNull(context.getString(HdfsConstants.MAX_IMPORT_DATE));
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd725248/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/HdfsIncrementalReadTest.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/HdfsIncrementalReadTest.java
 
b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/HdfsIncrementalReadTest.java
new file mode 100644
index 0000000..a32a563
--- /dev/null
+++ 
b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/HdfsIncrementalReadTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.connector.hdfs;
+
+import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class HdfsIncrementalReadTest extends ConnectorTestCase {
+
+  @BeforeMethod(alwaysRun = true)
+  public void createTable() {
+    createTableCities();
+  }
+
+  @AfterMethod(alwaysRun = true)
+  public void dropTable() {
+    super.dropTable();
+  }
+
+  @Test
+  public void testBasic() throws Exception {
+    createFromFile("input-0001",
+        "1,'USA','2004-10-23','San Francisco'"
+    );
+
+    // RDBMS link
+    MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
+    fillRdbmsLinkConfig(rdbmsLink);
+    saveLink(rdbmsLink);
+
+    // HDFS link
+    MLink hdfsLink = getClient().createLink("hdfs-connector");
+    fillHdfsLink(hdfsLink);
+    saveLink(hdfsLink);
+
+    // Job creation
+    MJob job = getClient().createJob(hdfsLink.getPersistenceId(), 
rdbmsLink.getPersistenceId());
+    fillHdfsFromConfig(job);
+    
job.getFromJobConfig().getEnumInput("incremental.incrementalType").setValue(IncrementalType.NEW_FILES);
+    fillRdbmsToConfig(job);
+    saveJob(job);
+
+    // Execute for the first time
+    executeJob(job);
+    assertEquals(provider.rowCount(getTableName()), 1);
+    assertRowInCities(1, "USA", "2004-10-23", "San Francisco");
+
+    // Second execution
+    createFromFile("input-0002",
+      "2,'USA','2004-10-24','Sunnyvale'",
+      "3,'Czech Republic','2004-10-25','Brno'"
+    );
+    executeJob(job);
+    assertEquals(provider.rowCount(getTableName()), 3);
+    assertRowInCities(1, "USA", "2004-10-23", "San Francisco");
+    assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale");
+    assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno");
+
+    // And last execution
+    createFromFile("input-0003",
+      "4,'USA','2004-10-26','Palo Alto'"
+    );
+    executeJob(job);
+    assertEquals(provider.rowCount(getTableName()), 4);
+    assertRowInCities(1, "USA", "2004-10-23", "San Francisco");
+    assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale");
+    assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno");
+    assertRowInCities(4, "USA", "2004-10-26", "Palo Alto");
+  }
+
+}

Reply via email to