Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 546f86152 -> 1fc658922


SQOOP-1825: Sqoop2: Handle NULLs for all types in CSV Intermediate Data Format

(Veena Basavaraj via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/1fc65892
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/1fc65892
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/1fc65892

Branch: refs/heads/sqoop2
Commit: 1fc658922129f6a02aeeb4cfe6399f2f39becfb6
Parents: 546f861
Author: Abraham Elmahrek <[email protected]>
Authored: Fri Dec 12 11:51:00 2014 -0600
Committer: Abraham Elmahrek <[email protected]>
Committed: Fri Dec 12 11:52:26 2014 -0600

----------------------------------------------------------------------
 .../idf/CSVIntermediateDataFormat.java          |  87 ++++++++++------
 .../idf/TestCSVIntermediateDataFormat.java      | 104 +++++++++++++++++--
 2 files changed, 151 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/1fc65892/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
 
b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index fe4cdd7..e4a83b1 100644
--- 
a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ 
b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -67,7 +67,7 @@ public class CSVIntermediateDataFormat extends 
IntermediateDataFormat<String> {
   public static final char ESCAPE_CHARACTER = '\\';
   public static final char QUOTE_CHARACTER = '\'';
 
-  public static final String NULL_STRING = "NULL";
+  public static final String NULL_VALUE = "NULL";
 
   private static final char[] originals = {
     0x5C,0x00,0x0A,0x0D,0x1A,0x22,0x27
@@ -249,7 +249,7 @@ public class CSVIntermediateDataFormat extends 
IntermediateDataFormat<String> {
     Column[] columnArray = schema.getColumns().toArray(new 
Column[fieldStringArray.length]);
     for (int i = 0; i < fieldStringArray.length; i++) {
       // check for NULL field and bail out immediately
-      if (fieldStringArray[i].equals("NULL")) {
+      if (fieldStringArray[i].equals(NULL_VALUE)) {
         objectArray[i] = null;
         continue;
       }
@@ -410,8 +410,17 @@ public class CSVIntermediateDataFormat extends 
IntermediateDataFormat<String> {
    */
   @Override
   public void setObjectData(Object[] data) {
+   Set<Integer> nullValueIndices = new HashSet<Integer>();
     Column[] columnArray = schema.getColumns().toArray(new 
Column[data.length]);
-    encodeCSVStringElements(data, columnArray);
+    // check for null
+    for (int i = 0; i < data.length; i++) {
+      if (data[i] == null) {
+        nullValueIndices.add(i);
+        data[i] = NULL_VALUE;
+      }
+    }
+    // ignore the null values while encoding the object array into csv string
+    encodeCSVStringElements(data, columnArray, nullValueIndices);
     this.data = StringUtils.join(data, SEPARATOR_CHARACTER);
   }
 
@@ -465,49 +474,66 @@ public class CSVIntermediateDataFormat extends 
IntermediateDataFormat<String> {
    * @param objectArray
    */
   @SuppressWarnings("unchecked")
-  private void encodeCSVStringElements(Object[] objectArray, Column[] 
columnArray) {
+  private void encodeCSVStringElements(Object[] objectArray, Column[] 
columnArray, Set<Integer> nullValueIndices) {
     for (int i : bitTypeColumnIndices) {
-      String bitStringValue = objectArray[i].toString();
-      if ((TRUE_BIT_SET.contains(bitStringValue)) || 
(FALSE_BIT_SET.contains(bitStringValue))) {
-        objectArray[i] = bitStringValue;
-      } else {
-        throw new 
SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009,
 " given bit value: " + objectArray[i]);
+      if (!nullValueIndices.contains(i)) {
+        String bitStringValue = objectArray[i].toString();
+        if ((TRUE_BIT_SET.contains(bitStringValue)) || 
(FALSE_BIT_SET.contains(bitStringValue))) {
+          objectArray[i] = bitStringValue;
+        } else {
+          throw new 
SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009,
 " given bit value: "
+              + objectArray[i]);
+        }
       }
     }
     for (int i : stringTypeColumnIndices) {
-      objectArray[i] = escapeString((String) objectArray[i]);
+      if (!nullValueIndices.contains(i)) {
+        objectArray[i] = escapeString((String) objectArray[i]);
+      }
     }
     for (int i : dateTimeTypeColumnIndices) {
-      Column col = columnArray[i];
-      if (objectArray[i] instanceof org.joda.time.DateTime) {
-        org.joda.time.DateTime dateTime = (org.joda.time.DateTime) 
objectArray[i];
-        // check for fraction and time zone and then use the right formatter
-        formatDateTime(objectArray, i, col, dateTime);
-      } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
-        org.joda.time.LocalDateTime localDateTime = 
(org.joda.time.LocalDateTime) objectArray[i];
-        formatLocalDateTime(objectArray, i, col, localDateTime);
+      if (!nullValueIndices.contains(i)) {
+        Column col = columnArray[i];
+        if (objectArray[i] instanceof org.joda.time.DateTime) {
+          org.joda.time.DateTime dateTime = (org.joda.time.DateTime) 
objectArray[i];
+          // check for fraction and time zone and then use the right formatter
+          formatDateTime(objectArray, i, col, dateTime);
+        } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
+          org.joda.time.LocalDateTime localDateTime = 
(org.joda.time.LocalDateTime) objectArray[i];
+          formatLocalDateTime(objectArray, i, col, localDateTime);
+        }
       }
     }
     for (int i : dateTypeColumnIndices) {
-      org.joda.time.LocalDate date = (org.joda.time.LocalDate) objectArray[i];
-      objectArray[i] = encloseWithQuote(df.print(date));
+      if (!nullValueIndices.contains(i)) {
+        org.joda.time.LocalDate date = (org.joda.time.LocalDate) 
objectArray[i];
+        objectArray[i] = encloseWithQuote(df.print(date));
+      }
     }
     for (int i : timeColumnIndices) {
       Column col = columnArray[i];
-      if (((org.apache.sqoop.schema.type.Time) col).hasFraction()) {
-        objectArray[i] = 
encloseWithQuote(tfWithFraction.print((org.joda.time.LocalTime) 
objectArray[i]));
-      } else {
-        objectArray[i] = 
encloseWithQuote(tfWithNoFraction.print((org.joda.time.LocalTime) 
objectArray[i]));
+      if (!nullValueIndices.contains(i)) {
+        if (((org.apache.sqoop.schema.type.Time) col).hasFraction()) {
+          objectArray[i] = 
encloseWithQuote(tfWithFraction.print((org.joda.time.LocalTime) 
objectArray[i]));
+        } else {
+          objectArray[i] = 
encloseWithQuote(tfWithNoFraction.print((org.joda.time.LocalTime) 
objectArray[i]));
+        }
       }
     }
     for (int i : byteTypeColumnIndices) {
-      objectArray[i] = escapeByteArrays((byte[]) objectArray[i]);
+      if (!nullValueIndices.contains(i)) {
+        objectArray[i] = escapeByteArrays((byte[]) objectArray[i]);
+      }
     }
     for (int i : listTypeColumnIndices) {
-      objectArray[i] = encodeList((Object[]) objectArray[i], columnArray[i]);
+      if (!nullValueIndices.contains(i)) {
+        objectArray[i] = encodeList((Object[]) objectArray[i], columnArray[i]);
+      }
     }
     for (int i : mapTypeColumnIndices) {
-      objectArray[i] = encodeMap((Map<Object, Object>) objectArray[i], 
columnArray[i]);
+      if (!nullValueIndices.contains(i)) {
+        objectArray[i] = encodeMap((Map<Object, Object>) objectArray[i], 
columnArray[i]);
+      }
     }
   }
 
@@ -562,8 +588,7 @@ public class CSVIntermediateDataFormat extends 
IntermediateDataFormat<String> {
   }
 
   private boolean isColumnStringType(Column stringType) {
-    return stringType.getType().equals(ColumnType.TEXT)
-        || stringType.getType().equals(ColumnType.ENUM);
+    return stringType.getType().equals(ColumnType.TEXT) || 
stringType.getType().equals(ColumnType.ENUM);
   }
 
   private String escapeByteArrays(byte[] bytes) {
@@ -586,10 +611,6 @@ public class CSVIntermediateDataFormat extends 
IntermediateDataFormat<String> {
   }
 
   private String escapeString(String orig) {
-    if (orig == null) {
-      return NULL_STRING;
-    }
-
     int j = 0;
     String replacement = orig;
     try {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1fc65892/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
 
b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
index 2376d4a..1a2a96f 100644
--- 
a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
+++ 
b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.schema.type.Array;
@@ -39,6 +40,7 @@ import org.apache.sqoop.schema.type.Binary;
 import org.apache.sqoop.schema.type.Bit;
 import org.apache.sqoop.schema.type.Date;
 import org.apache.sqoop.schema.type.DateTime;
+import org.apache.sqoop.schema.type.Decimal;
 import org.apache.sqoop.schema.type.FixedPoint;
 import org.apache.sqoop.schema.type.Text;
 import org.apache.sqoop.schema.type.Time;
@@ -83,20 +85,108 @@ public class TestCSVIntermediateDataFormat {
     assertNull(out);
   }
 
-  @Test(expected=SqoopException.class)
+  @Test(expected = SqoopException.class)
   public void testEmptyInputAsCSVTextInObjectArrayOut() {
     Schema schema = new Schema("test");
-    schema.addColumn(new FixedPoint("1"))
-        .addColumn(new FixedPoint("2"))
-        .addColumn(new Text("3"))
-        .addColumn(new Text("4"))
-        .addColumn(new Binary("5"))
-        .addColumn(new Text("6"));
+    schema.addColumn(new FixedPoint("1")).addColumn(new 
FixedPoint("2")).addColumn(new Text("3")).addColumn(new Text("4"))
+        .addColumn(new Binary("5")).addColumn(new Text("6"));
     dataFormat.setSchema(schema);
     dataFormat.setCSVTextData("");
     dataFormat.getObjectData();
   }
 
+  @Test
+  public void testNullValueAsObjectArrayInAndCSVTextOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new FixedPoint("1")).addColumn(new 
Decimal("2")).addColumn(new Text("3"))
+        .addColumn(new Array("4", new Text("t"))).addColumn(new Binary("5"))
+        .addColumn(new org.apache.sqoop.schema.type.Map("6", new Text("t1"), 
new Text("t2"))).addColumn(new Bit("7"))
+        .addColumn(new org.apache.sqoop.schema.type.DateTime("8", false, 
false))
+        .addColumn(new org.apache.sqoop.schema.type.Time("9", 
false)).addColumn(new org.apache.sqoop.schema.type.Date("10"))
+        .addColumn(new org.apache.sqoop.schema.type.FloatingPoint("11"))
+        .addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4")))
+        .addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new 
org.apache.sqoop.schema.type.Unknown("14"));
+
+    dataFormat.setSchema(schema);
+    Object[] in = { null, null, null, null, null, null, null, null, null, 
null, null, null, null, null };
+    dataFormat.setObjectData(in);
+
+    String csvText = dataFormat.getCSVTextData();
+    String[] textValues = csvText.split(",");
+    for (String text : textValues) {
+      assertEquals(text, CSVIntermediateDataFormat.NULL_VALUE);
+    }
+  }
+
+  @Test
+  public void testNullValueAsObjectArrayInAndObjectArrayOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new FixedPoint("1")).addColumn(new 
Decimal("2")).addColumn(new Text("3"))
+        .addColumn(new Array("4", new Text("t"))).addColumn(new Binary("5"))
+        .addColumn(new org.apache.sqoop.schema.type.Map("6", new Text("t1"), 
new Text("t2"))).addColumn(new Bit("7"))
+        .addColumn(new org.apache.sqoop.schema.type.DateTime("8", false, 
false))
+        .addColumn(new org.apache.sqoop.schema.type.Time("9", 
false)).addColumn(new org.apache.sqoop.schema.type.Date("10"))
+        .addColumn(new org.apache.sqoop.schema.type.FloatingPoint("11"))
+        .addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4")))
+        .addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new 
org.apache.sqoop.schema.type.Unknown("14"));
+
+    dataFormat.setSchema(schema);
+    Object[] in = { null, null, null, null, null, null, null, null, null, 
null, null, null, null, null };
+    dataFormat.setObjectData(in);
+
+    Object[] out = dataFormat.getObjectData();
+    for (Object obj : out) {
+      assertEquals(obj, null);
+    }
+  }
+
+  @Test
+  public void testNullValueAsCSVTextInAndObjectArrayOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new FixedPoint("1")).addColumn(new 
Decimal("2")).addColumn(new Text("3"))
+        .addColumn(new Array("4", new Text("t"))).addColumn(new Binary("5"))
+        .addColumn(new org.apache.sqoop.schema.type.Map("6", new Text("t1"), 
new Text("t2"))).addColumn(new Bit("7"))
+        .addColumn(new org.apache.sqoop.schema.type.DateTime("8", false, 
false))
+        .addColumn(new org.apache.sqoop.schema.type.Time("9", 
false)).addColumn(new org.apache.sqoop.schema.type.Date("10"))
+        .addColumn(new org.apache.sqoop.schema.type.FloatingPoint("11"))
+        .addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4")))
+        .addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new 
org.apache.sqoop.schema.type.Unknown("14"));
+
+    dataFormat.setSchema(schema);
+    String[] test = { "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", 
"NULL", "NULL", "NULL", "NULL", "NULL", "NULL",
+        "NULL" };
+    dataFormat.setCSVTextData(StringUtils.join(test, ","));
+
+    Object[] out = dataFormat.getObjectData();
+    for (Object obj : out) {
+      assertEquals(obj, null);
+    }
+  }
+
+  @Test
+  public void testNullValueAsCSVTextInAndCSVTextOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new FixedPoint("1")).addColumn(new 
Decimal("2")).addColumn(new Text("3"))
+        .addColumn(new Array("4", new Text("t"))).addColumn(new Binary("5"))
+        .addColumn(new org.apache.sqoop.schema.type.Map("6", new Text("t1"), 
new Text("t2"))).addColumn(new Bit("7"))
+        .addColumn(new org.apache.sqoop.schema.type.DateTime("8", false, 
false))
+        .addColumn(new org.apache.sqoop.schema.type.Time("9", 
false)).addColumn(new org.apache.sqoop.schema.type.Date("10"))
+        .addColumn(new org.apache.sqoop.schema.type.FloatingPoint("11"))
+        .addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4")))
+        .addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new 
org.apache.sqoop.schema.type.Unknown("14"));
+
+    dataFormat.setSchema(schema);
+    String[] test = { "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", 
"NULL", "NULL", "NULL", "NULL", "NULL", "NULL",
+        "NULL" };
+    dataFormat.setCSVTextData(StringUtils.join(test, ","));
+
+    String csvText = dataFormat.getCSVTextData();
+    String[] textValues = csvText.split(",");
+    for (String text : textValues) {
+      assertEquals(text, CSVIntermediateDataFormat.NULL_VALUE);
+    }
+  }
+
   //**************test cases for primitive types( text, number, 
bytearray)*******************
 
   @Test

Reply via email to