Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 015bb94d4 -> c820aaf87


SQOOP-1678: Sqoop2: [HDFS Connector] Configurable null values

(Abraham Elmahrek via Venkat Ranganathan)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/c820aaf8
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/c820aaf8
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/c820aaf8

Branch: refs/heads/sqoop2
Commit: c820aaf870ddfacc1039f559b138cb9e72b0a287
Parents: 015bb94
Author: Venkat Ranganathan <[email protected]>
Authored: Wed Dec 10 20:50:15 2014 -0800
Committer: Venkat Ranganathan <[email protected]>
Committed: Wed Dec 10 20:50:15 2014 -0800

----------------------------------------------------------------------
 .../sqoop/connector/hdfs/HdfsConstants.java     |  2 +
 .../sqoop/connector/hdfs/HdfsExtractor.java     | 32 ++++++---
 .../apache/sqoop/connector/hdfs/HdfsLoader.java | 14 +++-
 .../apache/sqoop/connector/hdfs/HdfsUtils.java  | 73 ++++++++++++++++++++
 .../hdfs/configuration/FromJobConfig.java       |  4 ++
 .../hdfs/configuration/ToJobConfig.java         |  6 ++
 .../resources/hdfs-connector-config.properties  | 18 +++++
 .../sqoop/connector/hdfs/TestExtractor.java     | 71 ++++++++++++++++---
 .../sqoop/connector/hdfs/TestHdfsBase.java      | 34 +++++++--
 .../sqoop/connector/hdfs/TestHdfsUtils.java     | 45 +++++++++++-
 .../apache/sqoop/connector/hdfs/TestLoader.java | 67 +++++++++++++++++-
 11 files changed, 335 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/c820aaf8/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
index 6e369c6..bd74bec 100644
--- 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
+++ 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
@@ -26,6 +26,8 @@ public final class HdfsConstants extends Constants {
   public static final String RESOURCE_BUNDLE_NAME =
           "hdfs-connector-config";
 
+  public static final char DEFAULT_FIELD_DELIMITER = ',';
+
   public static final char DEFAULT_RECORD_DELIMITER = '\n';
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c820aaf8/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
index 2586f94..3c417b4 100644
--- 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
+++ 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
@@ -52,7 +52,7 @@ public class HdfsExtractor extends 
Extractor<LinkConfiguration, FromJobConfigura
 
   @Override
   public void extract(ExtractorContext context, LinkConfiguration 
linkConfiguration,
-      FromJobConfiguration jobConfig, HdfsPartition partition) {
+      FromJobConfiguration jobConfiguration, HdfsPartition partition) {
 
     conf = HdfsUtils.configureURI(((PrefixContext) 
context.getContext()).getConfiguration(), linkConfiguration);
     dataWriter = context.getDataWriter();
@@ -62,14 +62,16 @@ public class HdfsExtractor extends 
Extractor<LinkConfiguration, FromJobConfigura
       LOG.info("Working on partition: " + p);
       int numFiles = p.getNumberOfFiles();
       for (int i = 0; i < numFiles; i++) {
-        extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
+        extractFile(linkConfiguration, jobConfiguration, p.getFile(i), 
p.getOffset(i), p.getLength(i));
       }
     } catch (IOException e) {
       throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0001, 
e);
     }
   }
 
-  private void extractFile(Path file, long start, long length)
+  private void extractFile(LinkConfiguration linkConfiguration,
+                           FromJobConfiguration fromJobCOnfiguration,
+                           Path file, long start, long length)
       throws IOException {
     long end = start + length;
     LOG.info("Extracting file " + file);
@@ -77,9 +79,9 @@ public class HdfsExtractor extends 
Extractor<LinkConfiguration, FromJobConfigura
     LOG.info("\t to offset " + end);
     LOG.info("\t of length " + length);
     if(isSequenceFile(file)) {
-      extractSequenceFile(file, start, length);
+      extractSequenceFile(linkConfiguration, fromJobCOnfiguration, file, 
start, length);
     } else {
-      extractTextFile(file, start, length);
+      extractTextFile(linkConfiguration, fromJobCOnfiguration, file, start, 
length);
     }
   }
 
@@ -91,7 +93,9 @@ public class HdfsExtractor extends 
Extractor<LinkConfiguration, FromJobConfigura
    * @throws IOException
    */
   @SuppressWarnings("deprecation")
-  private void extractSequenceFile(Path file, long start, long length)
+  private void extractSequenceFile(LinkConfiguration linkConfiguration,
+                                   FromJobConfiguration fromJobConfiguration,
+                                   Path file, long start, long length)
       throws IOException {
     LOG.info("Extracting sequence file");
     long end = start + length;
@@ -106,7 +110,11 @@ public class HdfsExtractor extends 
Extractor<LinkConfiguration, FromJobConfigura
     boolean hasNext = filereader.next(line);
     while (hasNext) {
       rowRead++;
-      dataWriter.writeStringRecord(line.toString());
+      if (HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)) {
+        dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, 
fromJobConfiguration, line.toString()));
+      } else {
+        dataWriter.writeStringRecord(line.toString());
+      }
       line = new Text();
       hasNext = filereader.next(line);
       if (filereader.getPosition() >= end && filereader.syncSeen()) {
@@ -124,7 +132,9 @@ public class HdfsExtractor extends 
Extractor<LinkConfiguration, FromJobConfigura
    * @throws IOException
    */
   @SuppressWarnings("resource")
-  private void extractTextFile(Path file, long start, long length)
+  private void extractTextFile(LinkConfiguration linkConfiguration,
+                               FromJobConfiguration fromJobConfiguration,
+                               Path file, long start, long length)
       throws IOException {
     LOG.info("Extracting text file");
     long end = start + length;
@@ -167,7 +177,11 @@ public class HdfsExtractor extends 
Extractor<LinkConfiguration, FromJobConfigura
         next = fileseeker.getPos();
       }
       rowRead++;
-      dataWriter.writeStringRecord(line.toString());
+      if (HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)) {
+        dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, 
fromJobConfiguration, line.toString()));
+      } else {
+        dataWriter.writeStringRecord(line.toString());
+      }
     }
     LOG.info("Extracting ended on position: " + fileseeker.getPos());
     filestream.close();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c820aaf8/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
index 6c57cf2..05b0230 100644
--- 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
+++ 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
@@ -81,10 +81,18 @@ public class HdfsLoader extends Loader<LinkConfiguration, 
ToJobConfiguration> {
 
       filewriter.initialize(filepath,conf,codec);
 
-      String csv;
+      if (HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfig)) {
+        Object[] record;
 
-      while ((csv = reader.readTextRecord()) != null) {
-        filewriter.write(csv);
+        while ((record = reader.readArrayRecord()) != null) {
+          filewriter.write(HdfsUtils.formatRecord(linkConfiguration, 
toJobConfig, record));
+        }
+      } else {
+        String record;
+
+        while ((record = reader.readTextRecord()) != null) {
+          filewriter.write(record);
+        }
       }
       filewriter.destroy();
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c820aaf8/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsUtils.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsUtils.java
 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsUtils.java
index 352ee17..353c1f2 100644
--- 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsUtils.java
+++ 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsUtils.java
@@ -17,8 +17,11 @@
  */
 package org.apache.sqoop.connector.hdfs;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
 
 /**
  * Utilities for HDFS.
@@ -40,4 +43,74 @@ public class HdfsUtils {
 
     return conf;
   }
+
+  /**
+   * Given the configurations, should data received be customized?
+   * @param linkConfiguration Link configuration
+   * @param fromJobConfiguration Job configuration
+   * @return boolean
+   */
+  public static boolean hasCustomFormat(LinkConfiguration linkConfiguration, 
FromJobConfiguration fromJobConfiguration) {
+    return fromJobConfiguration.fromJobConfig.overrideNullValue != null
+            && fromJobConfiguration.fromJobConfig.overrideNullValue;
+  }
+
+  /**
+   * Given the configurations, should data received be customized?
+   * @param linkConfiguration Link configuration
+   * @param toJobConfiguration Job configuration
+   * @return boolean
+   */
+  public static boolean hasCustomFormat(LinkConfiguration linkConfiguration, 
ToJobConfiguration toJobConfiguration) {
+    return toJobConfiguration.toJobConfig.overrideNullValue != null
+            && toJobConfiguration.toJobConfig.overrideNullValue;
+  }
+
+  /**
+   * Given a String record as provided by an intermediate data format or 
existing HDFS output
+   * format the record according to configuration.
+   * @param linkConfiguration Link configuration
+   * @param fromJobConfiguration Job configuration
+   * @param record String record
+   * @return Object[]
+   */
+  public static Object[] formatRecord(LinkConfiguration linkConfiguration,
+                                      FromJobConfiguration 
fromJobConfiguration,
+                                      String record) {
+    Object[] arrayRecord = StringUtils.split(record, 
HdfsConstants.DEFAULT_FIELD_DELIMITER);
+
+    if (fromJobConfiguration.fromJobConfig.overrideNullValue != null
+            && fromJobConfiguration.fromJobConfig.overrideNullValue) {
+      for (int i = 0; i < arrayRecord.length; ++i) {
+        if 
(arrayRecord[i].equals(fromJobConfiguration.fromJobConfig.nullValue)) {
+          arrayRecord[i] = null;
+        }
+      }
+    }
+
+    return arrayRecord;
+  }
+
+  /**
+   * Given an object array record as provided by an intermediate data format
+   * format record according to configuration.
+   * @param linkConfiguration Link configuration
+   * @param toJobConfiguration Job configuration
+   * @param record Record array
+   * @return String
+   */
+  public static String formatRecord(LinkConfiguration linkConfiguration,
+                                    ToJobConfiguration toJobConfiguration,
+                                    Object[] record) {
+    if (toJobConfiguration.toJobConfig.overrideNullValue != null
+            && toJobConfiguration.toJobConfig.overrideNullValue) {
+      for (int i = 0; i < record.length; ++i) {
+        if (record[i] == null) {
+          record[i] = toJobConfiguration.toJobConfig.nullValue;
+        }
+      }
+    }
+
+    return StringUtils.join(record, HdfsConstants.DEFAULT_FIELD_DELIMITER);
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c820aaf8/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfig.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfig.java
 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfig.java
index 509d772..89ff9aa 100644
--- 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfig.java
+++ 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfig.java
@@ -29,4 +29,8 @@ import org.apache.sqoop.validation.validators.NotEmpty;
 public class FromJobConfig {
 
   @Input(size = 255, validators = { @Validator(NotEmpty.class) }) public 
String inputDirectory;
+
+  @Input(size = 255) public Boolean overrideNullValue;
+
+  @Input(size = 255) public String nullValue;
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c820aaf8/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
index abddbfb..b7a9c3d 100644
--- 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
+++ 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
@@ -30,6 +30,12 @@ import org.apache.sqoop.validation.validators.NotEmpty;
 @ConfigClass(validators = { 
@Validator(ToJobConfig.ToJobConfigValidator.class)})
 public class ToJobConfig {
 
+  public static String DEFAULT_NULL_VALUE = "NULL";
+
+  @Input(size = 255) public Boolean overrideNullValue;
+
+  @Input(size = 255) public String nullValue;
+
   @Input public ToFormat outputFormat;
 
   @Input public ToCompression compression;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c820aaf8/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties 
b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
index 3d088d0..2dca634 100644
--- 
a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
+++ 
b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
@@ -48,6 +48,15 @@ toJobConfig.outputDirectory.help = Output directory for 
final data
 toJobConfig.ignored.label = Ignored
 toJobConfig.ignored.help = This value is ignored
 
+toJobConfig.overrideNullValue.label = Override null value
+toJobConfig.overrideNullValue.help = If set to true, then the null value will \
+                                     be overridden with the value set in \
+                                     toJobConfig.nullValue.
+
+toJobConfig.nullValue.label = Null value
+toJobConfig.nullValue.help = Use this particular character or sequence of 
characters \
+                             as a value representing null when outputting to a 
file.
+
 # From Job Config
 #
 fromJobConfig.label = From Job configuration
@@ -55,3 +64,12 @@ fromJobConfig.help = Specifies information required to get 
data from Hadoop ecos
 
 fromJobConfig.inputDirectory.label = Input directory
 fromJobConfig.inputDirectory.help = Directory that should be exported
+
+fromJobConfig.overrideNullValue.label = Override null value
+fromJobConfig.overrideNullValue.help = If set to true, then the null value 
will \
+                                     be overridden with the value set in \
+                                     toJobConfig.nullValue.
+
+fromJobConfig.nullValue.label = Null value
+fromJobConfig.nullValue.help = Use this particular character or sequence of 
characters \
+                             as a value representing null when outputting to a 
file.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c820aaf8/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
 
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
index f4b4454..e5b7b2a 100644
--- 
a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
+++ 
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
@@ -79,11 +79,11 @@ public class TestExtractor extends TestHdfsBase {
     FileUtils.mkdirs(inputDirectory);
     switch (this.outputFileType) {
       case TEXT_FILE:
-        createTextInput(inputDirectory, this.compressionClass, 
NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE);
+        createTextInput(inputDirectory, this.compressionClass, 
NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\N");
         break;
 
       case SEQUENCE_FILE:
-        createSequenceInput(inputDirectory, this.compressionClass, 
NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE);
+        createSequenceInput(inputDirectory, this.compressionClass, 
NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\N");
         break;
     }
   }
@@ -108,10 +108,10 @@ public class TestExtractor extends TestHdfsBase {
       public void writeStringRecord(String text) {
         int index;
         String[] components = text.split(",");
-        Assert.assertEquals(3, components.length);
+        Assert.assertEquals(5, components.length);
 
-        // Value should take on the form <integer>,<float>,'<integer>'
-        // for a single index. IE: 1,1.0,'1'.
+        // Value should take on the form <integer>,<float>,NULL,'<integer>'
+        // for a single index. IE: 1,1.0,NULL,'1'.
         try {
           index = Integer.parseInt(components[0]);
         } catch (NumberFormatException e) {
@@ -119,8 +119,10 @@ public class TestExtractor extends TestHdfsBase {
         }
 
         Assert.assertFalse(visited[index - 1]);
-        Assert.assertEquals(String.valueOf((double)index), components[1]);
-        Assert.assertEquals("'" + index + "'", components[2]);
+        Assert.assertEquals(String.valueOf((double) index), components[1]);
+        Assert.assertEquals("NULL", components[2]);
+        Assert.assertEquals("'" + index + "'", components[3]);
+        Assert.assertEquals("\\N", components[4]);
 
         visited[index - 1] = true;
       }
@@ -141,4 +143,57 @@ public class TestExtractor extends TestHdfsBase {
       Assert.assertTrue("Index " + (index + 1) + " was not visited", 
visited[index]);
     }
   }
-}
\ No newline at end of file
+
+  @Test
+  public void testOverrideNull() throws Exception {
+    Configuration conf = new Configuration();
+    PrefixContext prefixContext = new PrefixContext(conf, 
"org.apache.sqoop.job.connector.from.context.");
+    final boolean[] visited = new boolean[NUMBER_OF_FILES * 
NUMBER_OF_ROWS_PER_FILE];
+    ExtractorContext context = new ExtractorContext(prefixContext, new 
DataWriter() {
+      @Override
+      public void writeArrayRecord(Object[] array) {
+        int index;
+        Assert.assertEquals(5, array.length);
+
+        // Value should take on the form <integer>,<float>,NULL,'<integer>'
+        // for a single index. IE: 1,1.0,NULL,'1'.
+        try {
+          index = Integer.parseInt(array[0].toString());
+        } catch (NumberFormatException e) {
+          throw new AssertionError("Could not parse int for " + array[0]);
+        }
+
+        Assert.assertFalse(visited[index - 1]);
+        Assert.assertEquals(String.valueOf((double) index), array[1]);
+        Assert.assertEquals("NULL", array[2]);
+        Assert.assertEquals("'" + index + "'", array[3]);
+        Assert.assertNull(array[4]);
+
+        visited[index - 1] = true;
+      }
+
+      @Override
+      public void writeStringRecord(String text) {
+        throw new AssertionError("Should not be writing string.");
+      }
+
+      @Override
+      public void writeRecord(Object obj) {
+        throw new AssertionError("Should not be writing object.");
+      }
+    }, null);
+
+    LinkConfiguration emptyLinkConfig = new LinkConfiguration();
+    FromJobConfiguration fromJobConfiguration = new FromJobConfiguration();
+    fromJobConfiguration.fromJobConfig.overrideNullValue = true;
+    // Should skip "NULL" values
+    fromJobConfiguration.fromJobConfig.nullValue = "\\N";
+    HdfsPartition partition = 
createPartition(FileUtils.listDir(inputDirectory));
+
+    extractor.extract(context, emptyLinkConfig, fromJobConfiguration, 
partition);
+
+    for (int index = 0; index < NUMBER_OF_FILES * NUMBER_OF_ROWS_PER_FILE; 
++index) {
+      Assert.assertTrue("Index " + (index + 1) + " was not visited", 
visited[index]);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c820aaf8/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java
 
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java
index 6eae7fd..ac44595 100644
--- 
a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java
+++ 
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java
@@ -48,10 +48,17 @@ public class TestHdfsBase {
     return new HdfsPartition(paths, offsets, lengths, locations);
   }
 
+  protected String formatRow(String format, int index) {
+    String row = format.replaceAll("\\%s", "'" + index + "'");
+    row = row.replaceAll("\\%d", Integer.toString(index));
+    return row.replaceAll("\\%f", Double.toString((double)index));
+  }
+
   protected void createTextInput(String indir,
                                 Class<? extends CompressionCodec> clz,
                                 int numberOfFiles,
-                                int numberOfRows)
+                                int numberOfRows,
+                                String format)
       throws IOException, InstantiationException, IllegalAccessException {
     Configuration conf = new Configuration();
 
@@ -80,8 +87,7 @@ public class TestHdfsBase {
       }
 
       for (int ri = 0; ri < numberOfRows; ri++) {
-        String row = index + "," + (double)index + ",'" + index + "'";
-        filewriter.write(row + HdfsConstants.DEFAULT_RECORD_DELIMITER);
+        filewriter.write(formatRow(format, index) + 
HdfsConstants.DEFAULT_RECORD_DELIMITER);
         index++;
       }
 
@@ -89,10 +95,19 @@ public class TestHdfsBase {
     }
   }
 
+  protected void createTextInput(String indir,
+                                 Class<? extends CompressionCodec> clz,
+                                 int numberOfFiles,
+                                 int numberOfRows)
+      throws IOException, InstantiationException, IllegalAccessException {
+    createTextInput(indir, clz, numberOfFiles, numberOfRows, "%d,%f,%s");
+  }
+
   protected void createSequenceInput(String indir,
                                     Class<? extends CompressionCodec> clz,
                                     int numberOfFiles,
-                                    int numberOfRows)
+                                    int numberOfRows,
+                                    String format)
       throws IOException, InstantiationException, IllegalAccessException {
     Configuration conf = new Configuration();
 
@@ -119,8 +134,7 @@ public class TestHdfsBase {
 
       Text text = new Text();
       for (int ri = 0; ri < numberOfRows; ri++) {
-        String row = index + "," + (double)index + ",'" + index + "'";
-        text.set(row);
+        text.set(formatRow(format, index));
         filewriter.append(text, NullWritable.get());
         index++;
       }
@@ -128,4 +142,12 @@ public class TestHdfsBase {
       filewriter.close();
     }
   }
+
+  protected void createSequenceInput(String indir,
+                                     Class<? extends CompressionCodec> clz,
+                                     int numberOfFiles,
+                                     int numberOfRows)
+      throws IOException, InstantiationException, IllegalAccessException {
+    createSequenceInput(indir, clz, numberOfFiles, numberOfRows, "%d,%f,%s");
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c820aaf8/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java
 
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java
index 63e14ae..bba6502 100644
--- 
a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java
+++ 
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java
@@ -18,11 +18,12 @@
 package org.apache.sqoop.connector.hdfs;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.*;
 
 public class TestHdfsUtils {
 
@@ -41,4 +42,44 @@ public class TestHdfsUtils {
     assertEquals(TEST_URI, conf.get("fs.default.name"));
     assertEquals(TEST_URI, conf.get("fs.defaultFS"));
   }
+
+  @Test
+   public void testIsModifiable() throws Exception {
+    LinkConfiguration linkConfiguration = new LinkConfiguration();
+    FromJobConfiguration fromJobConfiguration = new FromJobConfiguration();
+    ToJobConfiguration toJobConfiguration = new ToJobConfiguration();
+
+    // No configuration
+    assertFalse(HdfsUtils.hasCustomFormat(linkConfiguration, 
fromJobConfiguration));
+    assertFalse(HdfsUtils.hasCustomFormat(linkConfiguration, 
toJobConfiguration));
+
+    // Without override
+    fromJobConfiguration.fromJobConfig.nullValue = "\0";
+    toJobConfiguration.toJobConfig.nullValue = "\0";
+    assertFalse(HdfsUtils.hasCustomFormat(linkConfiguration, 
fromJobConfiguration));
+    assertFalse(HdfsUtils.hasCustomFormat(linkConfiguration, 
toJobConfiguration));
+
+    // With override
+    fromJobConfiguration.fromJobConfig.overrideNullValue = true;
+    toJobConfiguration.toJobConfig.overrideNullValue = true;
+    assertTrue(HdfsUtils.hasCustomFormat(linkConfiguration, 
fromJobConfiguration));
+    assertTrue(HdfsUtils.hasCustomFormat(linkConfiguration, 
toJobConfiguration));
+  }
+
+  @Test
+  public void testTransformRecord() throws Exception {
+    LinkConfiguration linkConfiguration = new LinkConfiguration();
+    FromJobConfiguration fromJobConfiguration = new FromJobConfiguration();
+    ToJobConfiguration toJobConfiguration = new ToJobConfiguration();
+    final String record = "'Abe',\0,'test'";
+    final Object[] arrayRecord = new Object[]{
+      "'Abe'",
+      "\0",
+      "'test'"
+    };
+
+    // No transformations
+    assertArrayEquals(arrayRecord, HdfsUtils.formatRecord(linkConfiguration, 
fromJobConfiguration, record));
+    assertEquals(record, HdfsUtils.formatRecord(linkConfiguration, 
toJobConfiguration, arrayRecord));
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c820aaf8/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
 
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
index b404c34..be57fa0 100644
--- 
a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
+++ 
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
@@ -143,7 +143,64 @@ public class TestLoader extends TestHdfsBase {
     Assert.assertEquals(5, fs.listStatus(outputPath).length);
   }
 
-  private void verifyOutput(FileSystem fs, Path file) throws IOException {
+  @Test
+  public void testOverrideNull() throws Exception {
+    FileSystem fs = FileSystem.get(new Configuration());
+
+    Configuration conf = new Configuration();
+    PrefixContext prefixContext = new PrefixContext(conf, 
"org.apache.sqoop.job.connector.from.context.");
+    LoaderContext context = new LoaderContext(prefixContext, new DataReader() {
+      private long index = 0L;
+
+      @Override
+      public Object[] readArrayRecord() {
+        if (index++ < NUMBER_OF_ROWS_PER_FILE) {
+          return new Object[]{
+              index,
+              (double)index,
+              null,
+              "'" + index + "'"
+          };
+        } else {
+          return null;
+        }
+      }
+
+      @Override
+      public String readTextRecord() {
+        throw new AssertionError("should not be at readTextRecord");
+      }
+
+      @Override
+      public Object readContent() {
+        throw new AssertionError("should not be at readContent");
+      }
+    }, null);
+    LinkConfiguration linkConf = new LinkConfiguration();
+    ToJobConfiguration jobConf = new ToJobConfiguration();
+    jobConf.toJobConfig.outputDirectory = outputDirectory;
+    jobConf.toJobConfig.compression = compression;
+    jobConf.toJobConfig.outputFormat = outputFormat;
+    jobConf.toJobConfig.overrideNullValue = true;
+    jobConf.toJobConfig.nullValue = "\\N";
+    Path outputPath = new Path(outputDirectory);
+
+    loader.load(context, linkConf, jobConf);
+    Assert.assertEquals(1, fs.listStatus(outputPath).length);
+
+    for (FileStatus status : fs.listStatus(outputPath)) {
+      verifyOutput(fs, status.getPath(), "%d,%f,\\N,%s");
+    }
+
+    loader.load(context, linkConf, jobConf);
+    Assert.assertEquals(2, fs.listStatus(outputPath).length);
+    loader.load(context, linkConf, jobConf);
+    loader.load(context, linkConf, jobConf);
+    loader.load(context, linkConf, jobConf);
+    Assert.assertEquals(5, fs.listStatus(outputPath).length);
+  }
+
+  private void verifyOutput(FileSystem fs, Path file, String format) throws 
IOException {
     Configuration conf = new Configuration();
     FSDataInputStream fsin = fs.open(file);
     CompressionCodec codec;
@@ -181,7 +238,7 @@ public class TestLoader extends TestHdfsBase {
         BufferedReader textReader = new BufferedReader(in);
 
         for (int i = 1; i <= NUMBER_OF_ROWS_PER_FILE; ++i) {
-          Assert.assertEquals(i + "," + (double)i + ",'" + i + "'", 
textReader.readLine());
+          Assert.assertEquals(formatRow(format, i), textReader.readLine());
         }
         break;
 
@@ -208,10 +265,14 @@ public class TestLoader extends TestHdfsBase {
         Text line = new Text();
         int index = 1;
         while (sequenceReader.next(line)) {
-          Assert.assertEquals(index + "," + (double)index + ",'" + index++ + 
"'", line.toString());
+          Assert.assertEquals(formatRow(format, index++), line.toString());
           line = new Text();
         }
         break;
     }
   }
+
+  private void verifyOutput(FileSystem fs, Path file) throws IOException {
+    verifyOutput(fs, file, "%d,%f,%s");
+  }
 }

Reply via email to