Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 cb037c16a -> 418b9a70f


SQOOP-1579: Sqoop2: Data transfer to load into Hive does not work

(Abraham Elmahrek via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/418b9a70
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/418b9a70
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/418b9a70

Branch: refs/heads/sqoop2
Commit: 418b9a70f87c85dc4f8aa55b5fb31489a4cc9fc9
Parents: cb037c1
Author: Jarek Jarcec Cecho <[email protected]>
Authored: Fri Feb 27 06:49:04 2015 -0800
Committer: Jarek Jarcec Cecho <[email protected]>
Committed: Fri Feb 27 06:49:04 2015 -0800

----------------------------------------------------------------------
 .../sqoop/connector/hdfs/HdfsExtractor.java     |  10 +-
 .../apache/sqoop/connector/hdfs/HdfsLoader.java |   6 +-
 .../apache/sqoop/connector/hdfs/HdfsUtils.java  |  27 +-
 .../hdfs/configuration/ToJobConfig.java         |   3 -
 .../resources/hdfs-connector-config.properties  |   2 +-
 .../sqoop/connector/hdfs/TestExtractor.java     |  32 ++-
 .../sqoop/connector/hdfs/TestHdfsUtils.java     |  12 +-
 .../apache/sqoop/connector/hdfs/TestLoader.java |  27 +-
 .../sqoop/connector/common/SqoopIDFUtils.java   | 260 ++++++++++++++++---
 .../idf/CSVIntermediateDataFormat.java          | 152 +----------
 .../connector/common/TestSqoopIDFUtils.java     |  37 +++
 .../apache/sqoop/test/data/ShortStories.java    |  59 +++++
 .../sqoop/test/testcases/ConnectorTestCase.java |  15 ++
 .../jdbc/generic/FromHDFSToRDBMSTest.java       |  27 +-
 .../jdbc/generic/FromRDBMSToHDFSTest.java       |  47 +++-
 15 files changed, 481 insertions(+), 235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/418b9a70/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
index f70d56b..8237e51 100644
--- 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
+++ 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
@@ -32,12 +32,14 @@ import org.apache.hadoop.util.LineReader;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.PrefixContext;
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.common.SqoopIDFUtils;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.error.code.HdfsConnectorError;
 import org.apache.sqoop.etl.io.DataWriter;
 import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.etl.ExtractorContext;
+import org.apache.sqoop.schema.Schema;
 
 /**
  * Extract from HDFS.
@@ -49,6 +51,7 @@ public class HdfsExtractor extends 
Extractor<LinkConfiguration, FromJobConfigura
 
   private Configuration conf;
   private DataWriter dataWriter;
+  private Schema schema;
   private long rowsRead = 0;
 
   @Override
@@ -57,6 +60,7 @@ public class HdfsExtractor extends 
Extractor<LinkConfiguration, FromJobConfigura
 
     conf = HdfsUtils.configureURI(((PrefixContext) 
context.getContext()).getConfiguration(), linkConfiguration);
     dataWriter = context.getDataWriter();
+    schema = context.getSchema();
 
     try {
       HdfsPartition p = partition;
@@ -112,7 +116,8 @@ public class HdfsExtractor extends 
Extractor<LinkConfiguration, FromJobConfigura
     while (hasNext) {
       rowsRead++;
       if (HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)) {
-        dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, 
fromJobConfiguration, line.toString()));
+        Object[] data = SqoopIDFUtils.fromCSV(line.toString(), schema);
+        dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, 
fromJobConfiguration, data));
       } else {
         dataWriter.writeStringRecord(line.toString());
       }
@@ -179,7 +184,8 @@ public class HdfsExtractor extends 
Extractor<LinkConfiguration, FromJobConfigura
       }
       rowsRead++;
       if (HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)) {
-        dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, 
fromJobConfiguration, line.toString()));
+        Object[] data = SqoopIDFUtils.fromCSV(line.toString(), schema);
+        dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, 
fromJobConfiguration, data));
       } else {
         dataWriter.writeStringRecord(line.toString());
       }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/418b9a70/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
index bdee878..cee0a91 100644
--- 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
+++ 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.sqoop.common.PrefixContext;
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.common.SqoopIDFUtils;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
 import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
@@ -89,7 +90,10 @@ public class HdfsLoader extends Loader<LinkConfiguration, 
ToJobConfiguration> {
         Object[] record;
 
         while ((record = reader.readArrayRecord()) != null) {
-          filewriter.write(HdfsUtils.formatRecord(linkConfiguration, 
toJobConfig, record));
+          filewriter.write(
+              SqoopIDFUtils.toCSV(
+                  HdfsUtils.formatRecord(linkConfiguration, toJobConfig, 
record),
+                  context.getSchema()));
           rowsWritten++;
         }
       } else {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/418b9a70/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsUtils.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsUtils.java
 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsUtils.java
index 6754536..fce7728 100644
--- 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsUtils.java
+++ 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsUtils.java
@@ -17,7 +17,6 @@
  */
 package org.apache.sqoop.connector.hdfs;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
@@ -53,8 +52,7 @@ public class HdfsUtils {
    * @return boolean
    */
   public static boolean hasCustomFormat(LinkConfiguration linkConfiguration, 
FromJobConfiguration fromJobConfiguration) {
-    return fromJobConfiguration.fromJobConfig.overrideNullValue != null
-            && fromJobConfiguration.fromJobConfig.overrideNullValue;
+    return 
Boolean.TRUE.equals(fromJobConfiguration.fromJobConfig.overrideNullValue);
   }
 
   /**
@@ -64,8 +62,7 @@ public class HdfsUtils {
    * @return boolean
    */
   public static boolean hasCustomFormat(LinkConfiguration linkConfiguration, 
ToJobConfiguration toJobConfiguration) {
-    return toJobConfiguration.toJobConfig.overrideNullValue != null
-            && toJobConfiguration.toJobConfig.overrideNullValue;
+    return 
Boolean.TRUE.equals(toJobConfiguration.toJobConfig.overrideNullValue);
   }
 
   /**
@@ -73,24 +70,22 @@ public class HdfsUtils {
    * format the record according to configuration.
    * @param linkConfiguration Link configuration
    * @param fromJobConfiguration Job configuration
-   * @param record String record
+   * @param record Object[] record
    * @return Object[]
    */
   public static Object[] formatRecord(LinkConfiguration linkConfiguration,
                                       FromJobConfiguration 
fromJobConfiguration,
-                                      String record) {
-    Object[] arrayRecord = StringUtils.split(record, 
HdfsConstants.DEFAULT_FIELD_DELIMITER);
-
+                                      Object[] record) {
     if (fromJobConfiguration.fromJobConfig.overrideNullValue != null
             && fromJobConfiguration.fromJobConfig.overrideNullValue) {
-      for (int i = 0; i < arrayRecord.length; ++i) {
-        if 
(arrayRecord[i].equals(fromJobConfiguration.fromJobConfig.nullValue)) {
-          arrayRecord[i] = null;
+      for (int i = 0; i < record.length; ++i) {
+        if (record[i] != null && 
record[i].equals(fromJobConfiguration.fromJobConfig.nullValue)) {
+          record[i] = null;
         }
       }
     }
 
-    return arrayRecord;
+    return record;
   }
 
   /**
@@ -99,9 +94,9 @@ public class HdfsUtils {
    * @param linkConfiguration Link configuration
    * @param toJobConfiguration Job configuration
    * @param record Record array
-   * @return String
+   * @return Object[]
    */
-  public static String formatRecord(LinkConfiguration linkConfiguration,
+  public static Object[] formatRecord(LinkConfiguration linkConfiguration,
                                     ToJobConfiguration toJobConfiguration,
                                     Object[] record) {
     if (toJobConfiguration.toJobConfig.overrideNullValue != null
@@ -113,6 +108,6 @@ public class HdfsUtils {
       }
     }
 
-    return StringUtils.join(record, HdfsConstants.DEFAULT_FIELD_DELIMITER);
+    return record;
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/418b9a70/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
index b7a9c3d..6fc894b 100644
--- 
a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
+++ 
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
@@ -25,13 +25,10 @@ import 
org.apache.sqoop.validation.validators.AbstractValidator;
 import org.apache.sqoop.validation.validators.NotEmpty;
 
 /**
- *
  */
 @ConfigClass(validators = { 
@Validator(ToJobConfig.ToJobConfigValidator.class)})
 public class ToJobConfig {
 
-  public static String DEFAULT_NULL_VALUE = "NULL";
-
   @Input(size = 255) public Boolean overrideNullValue;
 
   @Input(size = 255) public String nullValue;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/418b9a70/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties 
b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
index 2dca634..3904856 100644
--- 
a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
+++ 
b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
@@ -72,4 +72,4 @@ fromJobConfig.overrideNullValue.help = If set to true, then 
the null value will
 
 fromJobConfig.nullValue.label = Null value
 fromJobConfig.nullValue.help = Use this particular character or sequence of 
characters \
-                             as a value representing null when outputting to a 
file.
\ No newline at end of file
+                             as a value representing null when outputting to a 
file.

http://git-wip-us.apache.org/repos/asf/sqoop/blob/418b9a70/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
 
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
index 9a5ec5e..9fcd2a8 100644
--- 
a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
+++ 
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
@@ -23,7 +23,6 @@ import static org.testng.AssertJUnit.assertTrue;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -37,6 +36,11 @@ import 
org.apache.sqoop.connector.hdfs.configuration.ToFormat;
 import org.apache.sqoop.etl.io.DataWriter;
 import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.etl.ExtractorContext;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
+import org.apache.sqoop.schema.type.Text;
+import org.testng.ITest;
 import org.testng.annotations.AfterMethod;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
@@ -80,11 +84,11 @@ public class TestExtractor extends TestHdfsBase {
     FileUtils.mkdirs(inputDirectory);
     switch (this.outputFileType) {
       case TEXT_FILE:
-        createTextInput(inputDirectory, this.compressionClass, 
NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\N");
+        createTextInput(inputDirectory, this.compressionClass, 
NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\\\N");
         break;
 
       case SEQUENCE_FILE:
-        createSequenceInput(inputDirectory, this.compressionClass, 
NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\N");
+        createSequenceInput(inputDirectory, this.compressionClass, 
NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\\\N");
         break;
     }
   }
@@ -99,6 +103,11 @@ public class TestExtractor extends TestHdfsBase {
     Configuration conf = new Configuration();
     PrefixContext prefixContext = new PrefixContext(conf, 
"org.apache.sqoop.job.connector.from.context.");
     final boolean[] visited = new boolean[NUMBER_OF_FILES * 
NUMBER_OF_ROWS_PER_FILE];
+    Schema schema = new Schema("schema").addColumn(new FixedPoint("col1", 4L, 
true))
+        .addColumn(new FloatingPoint("col2", 4L))
+        .addColumn(new Text("col3"))
+        .addColumn(new Text("col4"))
+        .addColumn(new Text("col5"));
     ExtractorContext context = new ExtractorContext(prefixContext, new 
DataWriter() {
       @Override
       public void writeArrayRecord(Object[] array) {
@@ -123,7 +132,7 @@ public class TestExtractor extends TestHdfsBase {
         Assert.assertEquals(String.valueOf((double) index), components[1]);
         Assert.assertEquals("NULL", components[2]);
         Assert.assertEquals("'" + index + "'", components[3]);
-        Assert.assertEquals("\\N", components[4]);
+        Assert.assertEquals("\\\\N", components[4]);
 
         visited[index - 1] = true;
       }
@@ -132,7 +141,7 @@ public class TestExtractor extends TestHdfsBase {
       public void writeRecord(Object obj) {
         throw new AssertionError("Should not be writing object.");
       }
-    }, null);
+    }, schema);
 
     LinkConfiguration emptyLinkConfig = new LinkConfiguration();
     FromJobConfiguration emptyJobConfig = new FromJobConfiguration();
@@ -150,6 +159,11 @@ public class TestExtractor extends TestHdfsBase {
     Configuration conf = new Configuration();
     PrefixContext prefixContext = new PrefixContext(conf, 
"org.apache.sqoop.job.connector.from.context.");
     final boolean[] visited = new boolean[NUMBER_OF_FILES * 
NUMBER_OF_ROWS_PER_FILE];
+    Schema schema = new Schema("schema").addColumn(new FixedPoint("col1", 4L, 
true))
+        .addColumn(new FloatingPoint("col2", 4L))
+        .addColumn(new Text("col3"))
+        .addColumn(new Text("col4"))
+        .addColumn(new Text("col5"));
     ExtractorContext context = new ExtractorContext(prefixContext, new 
DataWriter() {
       @Override
       public void writeArrayRecord(Object[] array) {
@@ -165,9 +179,9 @@ public class TestExtractor extends TestHdfsBase {
         }
 
         Assert.assertFalse(visited[index - 1]);
-        Assert.assertEquals(String.valueOf((double) index), array[1]);
-        Assert.assertEquals("NULL", array[2]);
-        Assert.assertEquals("'" + index + "'", array[3]);
+        Assert.assertEquals(String.valueOf((double) index), 
array[1].toString());
+        Assert.assertEquals(null, array[2]);
+        Assert.assertEquals(String.valueOf(index), array[3]);
         Assert.assertNull(array[4]);
 
         visited[index - 1] = true;
@@ -182,7 +196,7 @@ public class TestExtractor extends TestHdfsBase {
       public void writeRecord(Object obj) {
         throw new AssertionError("Should not be writing object.");
       }
-    }, null);
+    }, schema);
 
     LinkConfiguration emptyLinkConfig = new LinkConfiguration();
     FromJobConfiguration fromJobConfiguration = new FromJobConfiguration();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/418b9a70/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java
 
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java
index 310eadc..c560c08 100644
--- 
a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java
+++ 
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java
@@ -75,15 +75,19 @@ public class TestHdfsUtils {
     LinkConfiguration linkConfiguration = new LinkConfiguration();
     FromJobConfiguration fromJobConfiguration = new FromJobConfiguration();
     ToJobConfiguration toJobConfiguration = new ToJobConfiguration();
-    final String record = "'Abe',\0,'test'";
-    final Object[] arrayRecord = new Object[]{
+    final Object[] fromRecord = new Object[]{
+        "'Abe'",
+        "\0",
+        "'test'"
+    };
+    final Object[] toRecord = new Object[]{
       "'Abe'",
       "\0",
       "'test'"
     };
 
     // No transformations
-    assertArrayEquals(arrayRecord, HdfsUtils.formatRecord(linkConfiguration, 
fromJobConfiguration, record));
-    assertEquals(record, HdfsUtils.formatRecord(linkConfiguration, 
toJobConfiguration, arrayRecord));
+    assertArrayEquals(toRecord, HdfsUtils.formatRecord(linkConfiguration, 
fromJobConfiguration, fromRecord));
+    assertArrayEquals(fromRecord, HdfsUtils.formatRecord(linkConfiguration, 
toJobConfiguration, toRecord));
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/418b9a70/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
 
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
index b5ec6da..b7c81ec 100644
--- 
a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
+++ 
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
@@ -24,7 +24,6 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -33,7 +32,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.sqoop.common.PrefixContext;
@@ -44,6 +42,10 @@ import 
org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
 import org.apache.sqoop.etl.io.DataReader;
 import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.etl.LoaderContext;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
+import org.apache.sqoop.schema.type.Text;
 import org.testng.annotations.AfterMethod;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
@@ -96,6 +98,9 @@ public class TestLoader extends TestHdfsBase {
   @Test
   public void testLoader() throws Exception {
     FileSystem fs = FileSystem.get(new Configuration());
+    Schema schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, 
true))
+        .addColumn(new FloatingPoint("col2", 4L))
+        .addColumn(new Text("col3"));
 
     Configuration conf = new Configuration();
     PrefixContext prefixContext = new PrefixContext(conf, 
"org.apache.sqoop.job.connector.from.context.");
@@ -146,6 +151,10 @@ public class TestLoader extends TestHdfsBase {
   @Test
   public void testOverrideNull() throws Exception {
     FileSystem fs = FileSystem.get(new Configuration());
+    Schema schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, 
true))
+        .addColumn(new FloatingPoint("col2", 8L))
+        .addColumn(new Text("col3"))
+        .addColumn(new Text("col4"));
 
     Configuration conf = new Configuration();
     PrefixContext prefixContext = new PrefixContext(conf, 
"org.apache.sqoop.job.connector.from.context.");
@@ -159,7 +168,7 @@ public class TestLoader extends TestHdfsBase {
               index,
               (double)index,
               null,
-              "'" + index + "'"
+              String.valueOf(index)
           };
         } else {
           return null;
@@ -175,7 +184,7 @@ public class TestLoader extends TestHdfsBase {
       public Object readContent() {
         throw new AssertionError("should not be at readContent");
       }
-    }, null);
+    }, schema);
     LinkConfiguration linkConf = new LinkConfiguration();
     ToJobConfiguration jobConf = new ToJobConfiguration();
     jobConf.toJobConfig.outputDirectory = outputDirectory;
@@ -189,7 +198,7 @@ public class TestLoader extends TestHdfsBase {
     Assert.assertEquals(1, fs.listStatus(outputPath).length);
 
     for (FileStatus status : fs.listStatus(outputPath)) {
-      verifyOutput(fs, status.getPath(), "%d,%f,\\N,%s");
+      verifyOutput(fs, status.getPath(), "%d,%f,'\\\\N',%s");
     }
 
     loader.load(context, linkConf, jobConf);
@@ -238,7 +247,7 @@ public class TestLoader extends TestHdfsBase {
         BufferedReader textReader = new BufferedReader(in);
 
         for (int i = 1; i <= NUMBER_OF_ROWS_PER_FILE; ++i) {
-          Assert.assertEquals(formatRow(format, i), textReader.readLine());
+          Assert.assertEquals(textReader.readLine(), formatRow(format, i));
         }
         break;
 
@@ -262,11 +271,11 @@ public class TestLoader extends TestHdfsBase {
             break;
         }
 
-        Text line = new Text();
+        org.apache.hadoop.io.Text line = new org.apache.hadoop.io.Text();
         int index = 1;
         while (sequenceReader.next(line)) {
-          Assert.assertEquals(formatRow(format, index++), line.toString());
-          line = new Text();
+          Assert.assertEquals(line.toString(), formatRow(format, index++));
+          line = new org.apache.hadoop.io.Text();
         }
         break;
     }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/418b9a70/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
 
b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
index 71641ed..c460f80 100644
--- 
a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
+++ 
b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
@@ -22,6 +22,7 @@ import org.apache.sqoop.classification.InterfaceStability;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.error.code.CSVIntermediateDataFormatError;
 import org.apache.sqoop.error.code.IntermediateDataFormatError;
+import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.schema.type.AbstractComplexListType;
 import org.apache.sqoop.schema.type.Column;
 import org.apache.sqoop.schema.type.ColumnType;
@@ -47,6 +48,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.regex.Matcher;
 
 /**
@@ -66,17 +68,29 @@ public class SqoopIDFUtils {
   // implementation.
   public static final String BYTE_FIELD_CHARSET = "ISO-8859-1";
 
-  public static final char[] originals = { 0x5C, 0x00, 0x0A, 0x0D, 0x1A, 0x22, 
0x27 };
+  public static final Map<Character, String> ORIGINALS = new 
TreeMap<Character, String>();
 
   public static final char CSV_SEPARATOR_CHARACTER = ',';
   public static final char ESCAPE_CHARACTER = '\\';
   public static final char QUOTE_CHARACTER = '\'';
 
-  // string related replacements
-  private static final String[] replacements = { new String(new char[] { 
ESCAPE_CHARACTER, '\\' }),
-      new String(new char[] { ESCAPE_CHARACTER, '0' }), new String(new char[] 
{ ESCAPE_CHARACTER, 'n' }),
-      new String(new char[] { ESCAPE_CHARACTER, 'r' }), new String(new char[] 
{ ESCAPE_CHARACTER, 'Z' }),
-      new String(new char[] { ESCAPE_CHARACTER, '\"' }), new String(new char[] 
{ ESCAPE_CHARACTER, '\'' }) };
+  private static final Map<Character, Character> REPLACEMENTS = new 
TreeMap<Character, Character>();
+
+  static {
+    ORIGINALS.put(new Character((char)0x00), new String(new char[] { 
ESCAPE_CHARACTER, '0' }));
+    ORIGINALS.put(new Character((char)0x0A), new String(new char[] { 
ESCAPE_CHARACTER, 'n' }));
+    ORIGINALS.put(new Character((char)0x0D), new String(new char[] { 
ESCAPE_CHARACTER, 'r' }));
+    ORIGINALS.put(new Character((char)0x1A), new String(new char[] { 
ESCAPE_CHARACTER, 'Z' }));
+    ORIGINALS.put(new Character((char)0x22), new String(new char[] { 
ESCAPE_CHARACTER, '"' }));
+    ORIGINALS.put(new Character((char)0x27), new String(new char[] { 
ESCAPE_CHARACTER, '\'' }));
+
+    REPLACEMENTS.put('0', new Character((char)0x00));
+    REPLACEMENTS.put('n', new Character((char)0x0A));
+    REPLACEMENTS.put('r', new Character((char)0x0D));
+    REPLACEMENTS.put('Z', new Character((char)0x1A));
+    REPLACEMENTS.put('"', new Character((char)0x22));
+    REPLACEMENTS.put('\'', new Character((char)0x27));
+  }
 
   // http://www.joda.org/joda-time/key_format.html provides details on the
   // formatter token
@@ -420,42 +434,66 @@ public class SqoopIDFUtils {
 
   // ************ TEXT Column Type utils*********
 
-  private static String getRegExp(char character) {
-    return getRegExp(String.valueOf(character));
-  }
+  public static String toCSVString(String string) {
+    StringBuilder sb1 = new StringBuilder();
+    StringBuilder sb2 = new StringBuilder();
+
+    // Escape the escape character
+    for (int i = 0; i < string.length(); ++i) {
+      char c = string.charAt(i);
+      if (c == ESCAPE_CHARACTER) {
+        sb1.append(ESCAPE_CHARACTER);
+      }
 
-  private static String getRegExp(String string) {
-    return string.replaceAll("\\\\", Matcher.quoteReplacement("\\\\"));
-  }
+      sb1.append(c);
+    }
 
-  public static String toCSVString(String string) {
-    int j = 0;
-    String replacement = string;
-    try {
-      for (j = 0; j < replacements.length; j++) {
-        replacement = replacement.replaceAll(getRegExp(originals[j]), 
Matcher.quoteReplacement(replacements[j]));
+    // Encode characters
+    for (char c : sb1.toString().toCharArray()) {
+      if (ORIGINALS.containsKey(c)) {
+        sb2.append(ORIGINALS.get(c));
+      } else {
+        sb2.append(c);
       }
-    } catch (Exception e) {
-      throw new 
SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002,
 string + "  " + replacement
-          + "  " + String.valueOf(j) + "  " + e.getMessage());
     }
-    return encloseWithQuotes(replacement);
+
+    return encloseWithQuotes(sb2.toString());
   }
 
   public static String toText(String string) {
+    boolean escaped = false;
+    StringBuilder sb = new StringBuilder();
+    int i;
+
     // Remove the trailing and starting quotes.
     string = removeQuotes(string);
-    int j = 0;
-    try {
-      for (j = 0; j < replacements.length; j++) {
-        string = string.replaceAll(getRegExp(replacements[j]), 
Matcher.quoteReplacement(String.valueOf(originals[j])));
+
+    // Decode
+    for (i = 0; i < string.length(); ++i) {
+      char c = string.charAt(i);
+
+      if (escaped) {
+        escaped = false;
+
+        if (REPLACEMENTS.containsKey(c)) {
+          c = REPLACEMENTS.get(c);
+        }
+
+        sb.append(c);
+      } else {
+        switch(c) {
+          case ESCAPE_CHARACTER:
+            escaped = true;
+            break;
+
+          default:
+            sb.append(c);
+            break;
+        }
       }
-    } catch (Exception e) {
-      throw new 
SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0003,
 string + "  "
-          + String.valueOf(j) + e.getMessage());
     }
 
-    return string;
+    return sb.toString();
   }
 
   // ************ BINARY Column type utils*********
@@ -510,6 +548,85 @@ public class SqoopIDFUtils {
   }
 
   // ******* parse sqoop CSV ********
+
+  /**
+   * Encode to the sqoop prescribed CSV String for every element in the object
+   * array
+   *
+   * @param objectArray
+   */
+  @SuppressWarnings("unchecked")
+  public static String toCSV(Object[] objectArray, Schema schema) {
+    Column[] columns = schema.getColumnsArray();
+
+    StringBuilder csvString = new StringBuilder();
+    for (int i = 0; i < columns.length; i++) {
+      if (objectArray[i] == null && !columns[i].isNullable()) {
+        throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
+            columns[i].getName() + " does not support null values");
+      }
+      if (objectArray[i] == null) {
+        csvString.append(NULL_VALUE);
+      } else {
+        switch (columns[i].getType()) {
+          case ARRAY:
+          case SET:
+            csvString.append(toCSVList((Object[]) objectArray[i], 
(AbstractComplexListType) columns[i]));
+            break;
+          case MAP:
+            csvString.append(toCSVMap((Map<Object, Object>) objectArray[i], 
columns[i]));
+            break;
+          case ENUM:
+          case TEXT:
+            csvString.append(toCSVString(objectArray[i].toString()));
+            break;
+          case BINARY:
+          case UNKNOWN:
+            csvString.append(toCSVByteArray((byte[]) objectArray[i]));
+            break;
+          case FIXED_POINT:
+            csvString.append(toCSVFixedPoint(objectArray[i], columns[i]));
+            break;
+          case FLOATING_POINT:
+            csvString.append(toCSVFloatingPoint(objectArray[i], columns[i]));
+            break;
+          case DECIMAL:
+            csvString.append(toCSVDecimal(objectArray[i]));
+            break;
+          // stored in JSON as strings in the joda time format
+          case DATE:
+            csvString.append(toCSVDate(objectArray[i]));
+            break;
+          case TIME:
+            csvString.append(toCSVTime(objectArray[i], columns[i]));
+            break;
+          case DATE_TIME:
+            if (objectArray[i] instanceof org.joda.time.DateTime) {
+              org.joda.time.DateTime dateTime = (org.joda.time.DateTime) 
objectArray[i];
+              // check for fraction and time zone and then use the right 
formatter
+              csvString.append(toCSVDateTime(dateTime, columns[i]));
+            } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
+              org.joda.time.LocalDateTime localDateTime = 
(org.joda.time.LocalDateTime) objectArray[i];
+              csvString.append(toCSVLocalDateTime(localDateTime, columns[i]));
+            }
+            break;
+          case BIT:
+            csvString.append(toCSVBit(objectArray[i]));
+            break;
+          default:
+            throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
+                "Column type from schema was not recognized for " + 
columns[i].getType());
+        }
+      }
+      if (i < columns.length - 1) {
+        csvString.append(CSV_SEPARATOR_CHARACTER);
+      }
+
+    }
+
+    return csvString.toString();
+  }
+
   /**
    * Custom CSV Text parser that honors quoting and escaped quotes.
    *
@@ -561,4 +678,87 @@ public class SqoopIDFUtils {
     return parsedData.toArray(new String[parsedData.size()]);
   }
 
+  private static Object toObject(String csvString, Column column) {
+    Object returnValue = null;
+
+    switch (column.getType()) {
+      case ENUM:
+      case TEXT:
+        returnValue = toText(csvString);
+        break;
+      case BINARY:
+        // Unknown is treated as a binary type
+      case UNKNOWN:
+        returnValue = toByteArray(csvString);
+        break;
+      case FIXED_POINT:
+        returnValue = toFixedPoint(csvString, column);
+        break;
+      case FLOATING_POINT:
+        returnValue = toFloatingPoint(csvString, column);
+        break;
+      case DECIMAL:
+        returnValue = toDecimal(csvString, column);
+        break;
+      case DATE:
+        returnValue = toDate(csvString, column);
+        break;
+      case TIME:
+        returnValue = toTime(csvString, column);
+        break;
+      case DATE_TIME:
+        returnValue = toDateTime(csvString, column);
+        break;
+      case BIT:
+        returnValue = toBit(csvString);
+        break;
+      case ARRAY:
+      case SET:
+        returnValue = toList(csvString);
+        break;
+      case MAP:
+        returnValue = toMap(csvString);
+        break;
+      default:
+        throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004,
+            "Column type from schema was not recognized for " + 
column.getType());
+    }
+    return returnValue;
+  }
+
+  /**
+   * Parse CSV text data
+   * @param csvText csv text to parse
+   * @param schema schema to understand data
+   * @return Object[]
+   */
+  public static Object[] fromCSV(String csvText, Schema schema) {
+    String[] csvArray = parseCSVString(csvText);
+
+    if (csvArray == null) {
+      return null;
+    }
+
+    Column[] columns = schema.getColumnsArray();
+
+    if (csvArray.length != columns.length) {
+      throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
+          "The data " + csvArray + " has the wrong number of fields.");
+    }
+
+    Object[] objectArray = new Object[csvArray.length];
+    for (int i = 0; i < csvArray.length; i++) {
+      if (csvArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) {
+        throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
+            columns[i].getName() + " does not support null values");
+      }
+      if (csvArray[i].equals(NULL_VALUE)) {
+        objectArray[i] = null;
+        continue;
+      }
+      objectArray[i] = toObject(csvArray[i], columns[i]);
+    }
+
+    return objectArray;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/418b9a70/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
 
b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index c233fb2..76eaa67 100644
--- 
a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ 
b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -18,21 +18,15 @@
  */
 package org.apache.sqoop.connector.idf;
 
-import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
-
 import org.apache.sqoop.classification.InterfaceAudience;
 import org.apache.sqoop.classification.InterfaceStability;
 import org.apache.log4j.Logger;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.error.code.IntermediateDataFormatError;
+import org.apache.sqoop.connector.common.SqoopIDFUtils;
 import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.type.AbstractComplexListType;
-import org.apache.sqoop.schema.type.Column;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Map;
 import java.util.Set;
 
 /**
@@ -80,79 +74,7 @@ public class CSVIntermediateDataFormat extends 
IntermediateDataFormat<String> {
   @Override
   public Object[] getObjectData() {
     super.validateSchema(schema);
-    String[] csvStringArray = parseCSVString(this.data);
-
-    if (csvStringArray == null) {
-      return null;
-    }
-    Column[] columns = schema.getColumnsArray();
-
-    if (csvStringArray.length != columns.length) {
-      throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
-          "The data " + getCSVTextData() + " has the wrong number of fields.");
-    }
-
-    Object[] objectArray = new Object[csvStringArray.length];
-    for (int i = 0; i < csvStringArray.length; i++) {
-      if (csvStringArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) {
-        throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
-            columns[i].getName() + " does not support null values");
-      }
-      if (csvStringArray[i].equals(NULL_VALUE)) {
-        objectArray[i] = null;
-        continue;
-      }
-      objectArray[i] = toObject(csvStringArray[i], columns[i]);
-    }
-    return objectArray;
-  }
-
-  private Object toObject(String csvString, Column column) {
-    Object returnValue = null;
-
-    switch (column.getType()) {
-    case ENUM:
-    case TEXT:
-      returnValue = toText(csvString);
-      break;
-    case BINARY:
-      // Unknown is treated as a binary type
-    case UNKNOWN:
-      returnValue = toByteArray(csvString);
-      break;
-    case FIXED_POINT:
-      returnValue = toFixedPoint(csvString, column);
-      break;
-    case FLOATING_POINT:
-      returnValue = toFloatingPoint(csvString, column);
-      break;
-    case DECIMAL:
-      returnValue = toDecimal(csvString, column);
-      break;
-    case DATE:
-      returnValue = toDate(csvString, column);
-      break;
-    case TIME:
-      returnValue = toTime(csvString, column);
-      break;
-    case DATE_TIME:
-      returnValue = toDateTime(csvString, column);
-      break;
-    case BIT:
-      returnValue = toBit(csvString);
-      break;
-    case ARRAY:
-    case SET:
-      returnValue = toList(csvString);
-      break;
-    case MAP:
-      returnValue = toMap(csvString);
-      break;
-    default:
-      throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004,
-          "Column type from schema was not recognized for " + 
column.getType());
-    }
-    return returnValue;
+    return SqoopIDFUtils.fromCSV(data, schema);
   }
 
   /**
@@ -191,75 +113,7 @@ public class CSVIntermediateDataFormat extends 
IntermediateDataFormat<String> {
    */
   @SuppressWarnings("unchecked")
   private String toCSV(Object[] objectArray) {
-
-    Column[] columns = schema.getColumnsArray();
-
-    StringBuilder csvString = new StringBuilder();
-    for (int i = 0; i < columns.length; i++) {
-      if (objectArray[i] == null && !columns[i].isNullable()) {
-        throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
-            columns[i].getName() + " does not support null values");
-      }
-      if (objectArray[i] == null) {
-        csvString.append(NULL_VALUE);
-      } else {
-        switch (columns[i].getType()) {
-        case ARRAY:
-        case SET:
-          csvString.append(toCSVList((Object[]) objectArray[i], 
(AbstractComplexListType) columns[i]));
-          break;
-        case MAP:
-          csvString.append(toCSVMap((Map<Object, Object>) objectArray[i], 
columns[i]));
-          break;
-        case ENUM:
-        case TEXT:
-          csvString.append(toCSVString(objectArray[i].toString()));
-          break;
-        case BINARY:
-        case UNKNOWN:
-          csvString.append(toCSVByteArray((byte[]) objectArray[i]));
-          break;
-        case FIXED_POINT:
-          csvString.append(toCSVFixedPoint(objectArray[i], columns[i]));
-          break;
-        case FLOATING_POINT:
-          csvString.append(toCSVFloatingPoint(objectArray[i], columns[i]));
-          break;
-        case DECIMAL:
-          csvString.append(toCSVDecimal(objectArray[i]));
-          break;
-        // stored in JSON as strings in the joda time format
-        case DATE:
-          csvString.append(toCSVDate(objectArray[i]));
-          break;
-        case TIME:
-          csvString.append(toCSVTime(objectArray[i], columns[i]));
-          break;
-        case DATE_TIME:
-          if (objectArray[i] instanceof org.joda.time.DateTime) {
-            org.joda.time.DateTime dateTime = (org.joda.time.DateTime) 
objectArray[i];
-            // check for fraction and time zone and then use the right 
formatter
-            csvString.append(toCSVDateTime(dateTime, columns[i]));
-          } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
-            org.joda.time.LocalDateTime localDateTime = 
(org.joda.time.LocalDateTime) objectArray[i];
-            csvString.append(toCSVLocalDateTime(localDateTime, columns[i]));
-          }
-          break;
-        case BIT:
-          csvString.append(toCSVBit(objectArray[i]));
-          break;
-        default:
-          throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
-              "Column type from schema was not recognized for " + 
columns[i].getType());
-        }
-      }
-      if (i < columns.length - 1) {
-        csvString.append(CSV_SEPARATOR_CHARACTER);
-      }
-
-    }
-
-    return csvString.toString();
+    return SqoopIDFUtils.toCSV(objectArray, schema);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/418b9a70/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/common/TestSqoopIDFUtils.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/common/TestSqoopIDFUtils.java
 
b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/common/TestSqoopIDFUtils.java
index 588ce29..f99d1af 100644
--- 
a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/common/TestSqoopIDFUtils.java
+++ 
b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/common/TestSqoopIDFUtils.java
@@ -320,4 +320,41 @@ public class TestSqoopIDFUtils {
     assertEquals("23.44444444", toCSVDecimal(bd));
   }
 
+  @Test
+  public void testEscaping() {
+    String[][] testData = new String[][]{
+        {"\0", "'\\0'"},
+        {"\\0", "'\\\\0'"},
+        {"\\\0", "'\\\\\\0'"},
+        {"\\\\0", "'\\\\\\\\0'"},
+        {"\n", "'\\n'"},
+        {"\\n", "'\\\\n'"},
+        {"\\\n", "'\\\\\\n'"},
+        {"\\\\n", "'\\\\\\\\n'"},
+        {"\r", "'\\r'"},
+        {"\\r", "'\\\\r'"},
+        {"\\\r", "'\\\\\\r'"},
+        {"\\\\r", "'\\\\\\\\r'"},
+        {Character.toString((char)0x1A), "'\\Z'"},
+        {"\\Z", "'\\\\Z'"},
+        {"\\" + Character.toString((char)0x1A), "'\\\\\\Z'"},
+        {"\\\\Z", "'\\\\\\\\Z'"},
+        {"\"", "'\\\"'"},
+        {"\\\"", "'\\\\\\\"'"},
+        {"\\\\\"", "'\\\\\\\\\\\"'"},
+        {"\\\\\\\"", "'\\\\\\\\\\\\\\\"'"},
+        {"'", "'\\''"},
+        {"\\'", "'\\\\\\''"},
+        {"\\\\'", "'\\\\\\\\\\''"},
+        {"\\\\\\'", "'\\\\\\\\\\\\\\''"}
+    };
+
+    for (String[] testDatum : testData) {
+      String csvData = SqoopIDFUtils.toCSVString(testDatum[0]);
+
+      assertEquals(csvData, testDatum[1]);
+
+      assertEquals(SqoopIDFUtils.toText(csvData), testDatum[0]);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/418b9a70/test/src/main/java/org/apache/sqoop/test/data/ShortStories.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/data/ShortStories.java 
b/test/src/main/java/org/apache/sqoop/test/data/ShortStories.java
new file mode 100644
index 0000000..17148bc
--- /dev/null
+++ b/test/src/main/java/org/apache/sqoop/test/data/ShortStories.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.test.data;
+
+import org.apache.sqoop.common.test.db.DatabaseProvider;
+
+/**
+ * Releases of Ubuntu Linux.
+ *
+ * Purpose of this set is to cover most common data types (varchar, int, 
numeric, date, boolean).
+ */
+public class ShortStories extends DataSet {
+
+  public ShortStories(DatabaseProvider provider, String tableBaseName) {
+    super(provider, tableBaseName);
+  }
+
+  @Override
+  public DataSet createTables() {
+    provider.createTable(
+        tableBaseName,
+        "id",
+        "id", "int",
+        "name", "varchar(64)",
+        "story", "varchar(10000)"
+    );
+
+    return this;
+  }
+
+  @Override
+  public DataSet loadBasicData() {
+    provider.insertRow(tableBaseName,  1, "The Gift of the Magi",
+        "ONE DOLLAR AND EIGHTY-SEVEN CENTS. THAT WAS ALL. AND SIXTY CENTS of 
it was in pennies. Pennies saved one and two at a time by bulldozing the grocer 
and the vegetable man and the butcher until ones cheeks burned with the silent 
imputation of parsimony that such close dealing implied. Three times Della 
counted it. One dollar and eighty-seven cents. And the next day would be 
Christmas.\n" +
+        "\n" +
+        "There was clearly nothing left to do but flop down on the shabby 
little couch and howl. So Della did it. Which instigates the moral reflection 
that life is made up of sobs, sniffles, and smiles, with sniffles 
predominating.");
+    provider.insertRow(tableBaseName,  2, "The Little Match Girl",
+        "Most terribly cold it was; it snowed, and was nearly quite dark, and 
evening-- the last evening of the year. In this cold and darkness there went 
along the street a poor little girl, bareheaded, and with naked feet. When she 
left home she had slippers on, it is true; but what was the good of that? They 
were very large slippers, which her mother had hitherto worn; so large were 
they; and the poor little thing lost them as she scuffled away across the 
street, because of two carriages that rolled by dreadfully fast.");
+    provider.insertRow(tableBaseName,  3, "To Build a Fire",
+        "Day had broken cold and grey, exceedingly cold and grey, when the man 
turned aside from the main Yukon trail and climbed the high earth- bank, where 
a dim and little-travelled trail led eastward through the fat spruce 
timberland. It was a steep bank, and he paused for breath at the top, excusing 
the act to himself by looking at his watch. It was nine oclock. There was no 
sun nor hint of sun, though there was not a cloud in the sky. It was a clear 
day, and yet there seemed an intangible pall over the face of things, a subtle 
gloom that made the day dark, and that was due to the absence of sun. This fact 
did not worry the man. He was used to the lack of sun. It had been days since 
he had seen the sun, and he knew that a few more days must pass before that 
cheerful orb, due south, would just peep above the sky- line and dip 
immediately from view.");
+
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/418b9a70/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java 
b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java
index 398a051..1c5eb10 100644
--- a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java
+++ b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java
@@ -37,6 +37,7 @@ import org.apache.sqoop.model.MPersistableEntity;
 import org.apache.sqoop.model.MSubmission;
 import org.apache.sqoop.submission.SubmissionStatus;
 import org.apache.sqoop.test.data.Cities;
+import org.apache.sqoop.test.data.ShortStories;
 import org.apache.sqoop.test.data.UbuntuReleases;
 import org.apache.sqoop.test.hadoop.HadoopMiniClusterRunner;
 import org.apache.sqoop.test.hadoop.HadoopRunnerFactory;
@@ -198,6 +199,20 @@ abstract public class ConnectorTestCase extends 
TomcatTestCase {
   }
 
   /**
+   * Create table for short stories.
+   */
+  protected void createTableShortStories() {
+    new ShortStories(provider, getTableName()).createTables();
+  }
+
+  /**
+   * Create table for short stories.
+   */
+  protected void createAndLoadTableShortStories() {
+    new ShortStories(provider, getTableName()).createTables().loadBasicData();
+  }
+
+  /**
    * Assert row in testing table.
    *
    * @param conditions Conditions in config that are expected by the database 
provider

http://git-wip-us.apache.org/repos/asf/sqoop/blob/418b9a70/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java
 
b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java
index 397ce6f..a21e4a1 100644
--- 
a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java
+++ 
b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java
@@ -23,6 +23,9 @@ import org.apache.sqoop.model.MDriverConfig;
 import org.apache.sqoop.model.MLink;
 import org.apache.sqoop.model.MConfigList;
 import org.apache.sqoop.model.MJob;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static org.testng.AssertJUnit.assertEquals;
@@ -31,15 +34,23 @@ import static org.testng.AssertJUnit.assertEquals;
  *
  */
 public class FromHDFSToRDBMSTest extends ConnectorTestCase {
+  @BeforeMethod(alwaysRun = true)
+  public void createTable() {
+    createTableCities();
+  }
+
+  @AfterMethod(alwaysRun = true)
+  public void dropTable() {
+    super.dropTable();
+  }
 
   @Test
   public void testBasic() throws Exception {
-    createTableCities();
     createFromFile("input-0001",
-      "1,'USA','2004-10-23','San Francisco'",
-      "2,'USA','2004-10-24','Sunnyvale'",
-      "3,'Czech Republic','2004-10-25','Brno'",
-      "4,'USA','2004-10-26','Palo Alto'"
+        "1,'USA','2004-10-23','San Francisco'",
+        "2,'USA','2004-10-24','Sunnyvale'",
+        "3,'Czech Republic','2004-10-25','Brno'",
+        "4,'USA','2004-10-26','Palo Alto'"
     );
 
     // RDBMS link
@@ -68,14 +79,10 @@ public class FromHDFSToRDBMSTest extends ConnectorTestCase {
 
     executeJob(job);
 
-    assertEquals(4L, provider.rowCount(null, getTableName()));
+    assertEquals(4L, provider.rowCount(getTableName()));
     assertRowInCities(1, "USA", "2004-10-23", "San Francisco");
     assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale");
     assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno");
     assertRowInCities(4, "USA", "2004-10-26", "Palo Alto");
-
-    // Clean up testing table
-    dropTable();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/418b9a70/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java
 
b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java
index ced52cc..5552e04 100644
--- 
a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java
+++ 
b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java
@@ -35,7 +35,7 @@ import org.testng.annotations.Test;
 public class FromRDBMSToHDFSTest extends ConnectorTestCase {
 
   @Test
-  public void testBasic() throws Exception {
+  public void testCities() throws Exception {
     createAndLoadTableCities();
 
     // RDBMS link
@@ -78,6 +78,51 @@ public class FromRDBMSToHDFSTest extends ConnectorTestCase {
   }
 
   @Test
+  public void testStories() throws Exception {
+    createAndLoadTableShortStories();
+
+    // RDBMS link
+    MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
+    fillRdbmsLinkConfig(rdbmsLink);
+    saveLink(rdbmsLink);
+
+    // HDFS link
+    MLink hdfsLink = getClient().createLink("hdfs-connector");
+    saveLink(hdfsLink);
+
+    // Job creation
+    MJob job = getClient().createJob(rdbmsLink.getPersistenceId(), 
hdfsLink.getPersistenceId());
+
+    // Connector values
+    MConfigList configs = job.getJobConfig(Direction.FROM);
+    
configs.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName()));
+    
configs.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id"));
+    
configs.getStringInput("fromJobConfig.columns").setValue(provider.escapeColumnName("id")
 + "," + provider.escapeColumnName("name") + "," + 
provider.escapeColumnName("story"));
+    fillHdfsToConfig(job, ToFormat.TEXT_FILE);
+    saveJob(job);
+
+    MSubmission submission = getClient().startJob(job.getPersistenceId());
+    assertTrue(submission.getStatus().isRunning());
+
+    // Wait until the job finish - this active waiting will be removed once
+    // Sqoop client API will get blocking support.
+    do {
+      Thread.sleep(5000);
+      submission = getClient().getJobStatus(job.getPersistenceId());
+    } while(submission.getStatus().isRunning());
+
+    // Assert correct output
+    assertTo(
+        "1,'The Gift of the Magi','ONE DOLLAR AND EIGHTY-SEVEN CENTS. THAT WAS 
ALL. AND SIXTY CENTS of it was in pennies. Pennies saved one and two at a time 
by bulldozing the grocer and the vegetable man and the butcher until ones 
cheeks burned with the silent imputation of parsimony that such close dealing 
implied. Three times Della counted it. One dollar and eighty-seven cents. And 
the next day would be Christmas.\\n\\nThere was clearly nothing left to do but 
flop down on the shabby little couch and howl. So Della did it. Which 
instigates the moral reflection that life is made up of sobs, sniffles, and 
smiles, with sniffles predominating.'",
+        "2,'The Little Match Girl','Most terribly cold it was; it snowed, and 
was nearly quite dark, and evening-- the last evening of the year. In this cold 
and darkness there went along the street a poor little girl, bareheaded, and 
with naked feet. When she left home she had slippers on, it is true; but what 
was the good of that? They were very large slippers, which her mother had 
hitherto worn; so large were they; and the poor little thing lost them as she 
scuffled away across the street, because of two carriages that rolled by 
dreadfully fast.'",
+        "3,'To Build a Fire','Day had broken cold and grey, exceedingly cold 
and grey, when the man turned aside from the main Yukon trail and climbed the 
high earth- bank, where a dim and little-travelled trail led eastward through 
the fat spruce timberland. It was a steep bank, and he paused for breath at the 
top, excusing the act to himself by looking at his watch. It was nine oclock. 
There was no sun nor hint of sun, though there was not a cloud in the sky. It 
was a clear day, and yet there seemed an intangible pall over the face of 
things, a subtle gloom that made the day dark, and that was due to the absence 
of sun. This fact did not worry the man. He was used to the lack of sun. It had 
been days since he had seen the sun, and he knew that a few more days must pass 
before that cheerful orb, due south, would just peep above the sky- line and 
dip immediately from view.'"
+    );
+
+    // Clean up testing table
+    dropTable();
+  }
+
+  @Test
   public void testColumns() throws Exception {
     createAndLoadTableCities();
 

Reply via email to