Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 0cd1ec954 -> 34d7066c3


SQOOP-1749: Support List Type in CSV IDF

(Veena Basavaraj via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/34d7066c
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/34d7066c
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/34d7066c

Branch: refs/heads/sqoop2
Commit: 34d7066c3839a6da169d8534f80e91ba1044bd4e
Parents: 0cd1ec9
Author: Jarek Jarcec Cecho <[email protected]>
Authored: Sat Nov 29 07:35:03 2014 -0800
Committer: Jarek Jarcec Cecho <[email protected]>
Committed: Sat Nov 29 07:35:03 2014 -0800

----------------------------------------------------------------------
 .../idf/CSVIntermediateDataFormat.java          | 357 ++++++++++++-------
 .../connector/idf/IntermediateDataFormat.java   |   4 +-
 .../idf/IntermediateDataFormatError.java        |   4 +-
 .../idf/TestCSVIntermediateDataFormat.java      | 279 ++++++++++++++-
 4 files changed, 487 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/34d7066c/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
 
b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index 39a01c1..4f2baf9 100644
--- 
a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ 
b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -18,17 +18,19 @@
  */
 package org.apache.sqoop.connector.idf;
 
-import com.google.common.annotations.VisibleForTesting;
-
 import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.AbstractComplexListType;
 import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.schema.type.ColumnType;
 import org.apache.sqoop.schema.type.FixedPoint;
 import org.apache.sqoop.schema.type.FloatingPoint;
-import org.apache.sqoop.schema.type.ColumnType;
 import org.joda.time.LocalDate;
 import org.joda.time.LocalDateTime;
+import org.json.simple.JSONArray;
+import org.json.simple.parser.JSONParser;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -36,22 +38,36 @@ import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.math.BigDecimal;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.regex.Matcher;
 
+
+/**
+ * A concrete implementation for the {@link #IntermediateDataFormat} that
+ * represents each row of the data source as a comma separates list. Each
+ * element in the CSV represents a specific column value encoded as string 
using the sqoop specified rules.
+ * The methods allow serializing to this string and deserializing the string 
to its
+ * corresponding java object based on the {@link #Schema} and its
+ * {@link #Column} types.
+ *
+ */
 public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
 
+  public static final Logger LOG = 
Logger.getLogger(CSVIntermediateDataFormat.class);
+
   public static final char SEPARATOR_CHARACTER = ',';
   public static final char ESCAPE_CHARACTER = '\\';
   public static final char QUOTE_CHARACTER = '\'';
-  public static final String NULL_STRING = "NULL";
 
+  public static final String NULL_STRING = "NULL";
 
   private static final char[] originals = {
     0x5C,0x00,0x0A,0x0D,0x1A,0x22,0x27
   };
 
+
   private static final String[] replacements = {
     new String(new char[] { ESCAPE_CHARACTER, '\\'}),
     new String(new char[] { ESCAPE_CHARACTER, '0'}),
@@ -62,11 +78,14 @@ public class CSVIntermediateDataFormat extends 
IntermediateDataFormat<String> {
     new String(new char[] { ESCAPE_CHARACTER, '\''})
   };
 
-  // ISO-8859-1 is an 8-bit codec that is supported in every java 
implementation.
-  public static final String BYTE_FIELD_CHARSET = "ISO-8859-1";
+  // ISO-8859-1 is an 8-bit codec that is supported in every java
+  // implementation.
+  static final String BYTE_FIELD_CHARSET = "ISO-8859-1";
+
+  private final List<Integer> stringTypeColumnIndices = new 
ArrayList<Integer>();
+  private final List<Integer> byteTypeColumnIndices = new ArrayList<Integer>();
+  private final List<Integer> listTypeColumnIndices = new ArrayList<Integer>();
 
-  private final List<Integer> stringFieldIndices = new ArrayList<Integer>();
-  private final List<Integer> byteFieldIndices = new ArrayList<Integer>();
   private Schema schema;
 
   public CSVIntermediateDataFormat() {
@@ -97,78 +116,80 @@ public class CSVIntermediateDataFormat extends 
IntermediateDataFormat<String> {
    */
   @Override
   public void setSchema(Schema schema) {
-    if(schema == null) {
+    if (schema == null) {
       return;
     }
     this.schema = schema;
     List<Column> columns = schema.getColumns();
     int i = 0;
-    for(Column col : columns) {
-      if(col.getType() == ColumnType.TEXT) {
-        stringFieldIndices.add(i);
-      } else if(col.getType() == ColumnType.BINARY) {
-        byteFieldIndices.add(i);
+    for (Column col : columns) {
+      if (col.getType() == ColumnType.TEXT) {
+        stringTypeColumnIndices.add(i);
+      } else if (col.getType() == ColumnType.BINARY) {
+        byteTypeColumnIndices.add(i);
+      } else if (isColumnListType(col)) {
+        listTypeColumnIndices.add(i);
       }
       i++;
     }
   }
 
   /**
-   * Custom CSV parser that honors quoting and escaped quotes.
-   * All other escaping is handled elsewhere.
+   * Custom CSV parser that honors quoting and escaped quotes. All other
+   * escaping is handled elsewhere.
    *
    * @return String[]
    */
-  private String[] getFields() {
+  private String[] getFieldStringArray() {
     if (data == null) {
       return null;
     }
 
     boolean quoted = false;
     boolean escaped = false;
+    boolean insideJSON = false;
+
     List<String> parsedData = new LinkedList<String>();
-    StringBuffer buffer = new StringBuffer();
+    StringBuilder builder = new StringBuilder();
     for (int i = 0; i < data.length(); ++i) {
       char c = data.charAt(i);
-      switch(c) {
-        case QUOTE_CHARACTER:
-          buffer.append(c);
-          if (escaped) {
-            escaped = false;
-          } else {
-            quoted = !quoted;
-          }
-          break;
-
-        case ESCAPE_CHARACTER:
-          buffer.append(ESCAPE_CHARACTER);
-          escaped = !escaped;
-          break;
-
-        case SEPARATOR_CHARACTER:
-          if (quoted) {
-            buffer.append(c);
-          } else {
-            parsedData.add(buffer.toString());
-            buffer = new StringBuffer();
-          }
-          break;
-
-        default:
-          if (escaped) {
-            escaped = false;
-          }
-          buffer.append(c);
-          break;
+      switch (c) {
+      case QUOTE_CHARACTER:
+        builder.append(c);
+        if (escaped) {
+          escaped = false;
+        } else {
+          quoted = !quoted;
+        }
+        break;
+      case ESCAPE_CHARACTER:
+        builder.append(ESCAPE_CHARACTER);
+        escaped = !escaped;
+        break;
+      case SEPARATOR_CHARACTER:
+        if (quoted || insideJSON) {
+          builder.append(c);
+        } else {
+          parsedData.add(builder.toString());
+          builder = new StringBuilder();
+        }
+        break;
+      default:
+        if (escaped) {
+          escaped = false;
+        }
+        builder.append(c);
+        break;
       }
     }
-    parsedData.add(buffer.toString());
+    parsedData.add(builder.toString());
 
     return parsedData.toArray(new String[parsedData.size()]);
   }
 
   /**
-   * {@inheritDoc}
+   * Converts the CSV String array into actual object array based on its
+   * corresponding column type {@inheritDoc}
    */
   @Override
   public Object[] getObjectData() {
@@ -176,84 +197,108 @@ public class CSVIntermediateDataFormat extends 
IntermediateDataFormat<String> {
       throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006);
     }
 
-    String[] fields = getFields();
+    // fieldStringArray represents the csv fields parsed into string array
+    String[] fieldStringArray = getFieldStringArray();
 
-    if (fields == null) {
+    if (fieldStringArray == null) {
       return null;
     }
 
-    if (fields.length != schema.getColumns().size()) {
+    if (fieldStringArray.length != schema.getColumns().size()) {
       throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
           "The data " + getTextData() + " has the wrong number of fields.");
     }
 
-    Object[] out = new Object[fields.length];
-    Column[] cols = schema.getColumns().toArray(new Column[fields.length]);
-    for (int i = 0; i < fields.length; i++) {
-      ColumnType colType = cols[i].getType();
-      if (fields[i].equals("NULL")) {
-        out[i] = null;
+    Object[] objectArray = new Object[fieldStringArray.length];
+    Column[] columnArray = schema.getColumns().toArray(new 
Column[fieldStringArray.length]);
+    for (int i = 0; i < fieldStringArray.length; i++) {
+      // check for NULL field and bail out immediately
+      if (fieldStringArray[i].equals("NULL")) {
+        objectArray[i] = null;
         continue;
       }
+      objectArray[i] = parseStringArrayElement(fieldStringArray[i], 
columnArray[i]);
+    }
+    return objectArray;
+  }
 
-      Long byteSize;
-      switch(colType) {
-        case TEXT:
-          out[i] = unescapeStrings(fields[i]);
-          break;
-        case BINARY:
-        // Unknown is treated as a binary type
-        case UNKNOWN:
-          out[i] = unescapeByteArray(fields[i]);
-          break;
-        case FIXED_POINT:
-          byteSize = ((FixedPoint) cols[i]).getByteSize();
-          if (byteSize != null && byteSize <= Integer.SIZE) {
-            out[i] = Integer.valueOf(fields[i]);
-          } else {
-            out[i] = Long.valueOf(fields[i]);
-          }
-          break;
-        case FLOATING_POINT:
-          byteSize = ((FloatingPoint) cols[i]).getByteSize();
-          if (byteSize != null && byteSize <= Float.SIZE) {
-            out[i] = Float.valueOf(fields[i]);
-          } else {
-            out[i] = Double.valueOf(fields[i]);
-          }
-          break;
-        case DECIMAL:
-          out[i] = new BigDecimal(fields[i]);
-          break;
-        case DATE:
-          out[i] = LocalDate.parse(fields[i]);
-          break;
-        case DATE_TIME:
-          // A datetime string with a space as date-time separator will not be
-          // parsed expectedly. The expected separator is "T". See also:
-          // https://github.com/JodaOrg/joda-time/issues/11
-          String iso8601 = fields[i].replace(" ", "T");
-          out[i] = LocalDateTime.parse(iso8601);
-          break;
-        case BIT:
-          out[i] = Boolean.valueOf(fields[i].equals("1")
-              || fields[i].toLowerCase().equals("true"));
-          break;
-        default:
-          throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, 
"Column type from schema was not recognized for " + colType);
+  private Object parseStringArrayElement(String fieldString, Column column) {
+    Object returnValue = null;
+
+    switch (column.getType()) {
+    case TEXT:
+      returnValue = unescapeString(fieldString);
+      break;
+    case BINARY:
+      // Unknown is treated as a binary type
+    case UNKNOWN:
+      returnValue = unescapeByteArray(fieldString);
+      break;
+    case FIXED_POINT:
+      Long byteSize = ((FixedPoint) column).getByteSize();
+      if (byteSize != null && byteSize <= Integer.SIZE) {
+        returnValue = Integer.valueOf(fieldString);
+      } else {
+        returnValue = Long.valueOf(fieldString);
       }
+      break;
+    case FLOATING_POINT:
+      byteSize = ((FloatingPoint) column).getByteSize();
+      if (byteSize != null && byteSize <= Float.SIZE) {
+        returnValue = Float.valueOf(fieldString);
+      } else {
+        returnValue = Double.valueOf(fieldString);
+      }
+      break;
+    case DECIMAL:
+      returnValue = new BigDecimal(fieldString);
+      break;
+    case DATE:
+      returnValue = LocalDate.parse(fieldString);
+      break;
+    case DATE_TIME:
+      // A datetime string with a space as date-time separator will not be
+      // parsed expectedly. The expected separator is "T". See also:
+      // https://github.com/JodaOrg/joda-time/issues/11
+      String iso8601 = fieldString.replace(" ", "T");
+      returnValue = LocalDateTime.parse(iso8601);
+      break;
+    case BIT:
+      returnValue = Boolean.valueOf(fieldString.equals("1")
+          || fieldString.toLowerCase().equals("true"));
+      break;
+    case ARRAY:
+    case SET:
+      returnValue = parseListElementFromJSON(fieldString);
+      break;
+    default:
+      throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004,
+          "Column type from schema was not recognized for " + 
column.getType());
     }
-    return out;
+    return returnValue;
   }
 
+  private Object[] parseListElementFromJSON(String fieldString) {
+
+    JSONArray array = null;
+    try {
+      array = (JSONArray) new JSONParser().parse(removeQuotes(fieldString));
+    } catch (org.json.simple.parser.ParseException e) {
+      throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0008, e);
+    }
+    if (array != null) {
+     return array.toArray();
+    }
+    return null;
+  }
 
   /**
-   * {@inheritDoc}
+   * Appends the actual java objects into CSV string {@inheritDoc}
    */
-  @VisibleForTesting
   @Override
   public void setObjectData(Object[] data) {
-    escapeArray(data);
+    Column[] columnArray = schema.getColumns().toArray(new 
Column[data.length]);
+    encodeCSVStringElements(data, columnArray);
     this.data = StringUtils.join(data, SEPARATOR_CHARACTER);
   }
 
@@ -278,51 +323,75 @@ public class CSVIntermediateDataFormat extends 
IntermediateDataFormat<String> {
    */
   @Override
   public boolean equals(Object other) {
-    if(this == other) {
+    if (this == other) {
       return true;
     }
-    if(other == null || !(other instanceof CSVIntermediateDataFormat)) {
+    if (other == null || !(other instanceof CSVIntermediateDataFormat)) {
       return false;
     }
-    return data.equals(((CSVIntermediateDataFormat)other).data);
+    return data.equals(((CSVIntermediateDataFormat) other).data);
   }
 
   public int compareTo(IntermediateDataFormat<?> o) {
-    if(this == o) {
+    if (this == o) {
       return 0;
     }
-    if(this.equals(o)) {
+    if (this.equals(o)) {
       return 0;
     }
-    if(!(o instanceof CSVIntermediateDataFormat)) {
-      throw new IllegalStateException("Expected Data to be instance of " +
-        "CSVIntermediateFormat, but was an instance of " + o.getClass()
-        .getName());
+    if (!(o instanceof CSVIntermediateDataFormat)) {
+      throw new IllegalStateException("Expected Data to be instance of "
+          + "CSVIntermediateFormat, but was an instance of " + 
o.getClass().getName());
     }
     return data.compareTo(o.getTextData());
   }
 
   /**
-   * If the incoming data is an array, parse it and return the CSV-ised version
+   * Sanitize every element of the CSV string based on the column type
    *
-   * @param array
+   * @param stringArray
    */
-  private void escapeArray(Object[] array) {
-    for (int i : stringFieldIndices) {
-      array[i] = escapeStrings((String) array[i]);
+  private void encodeCSVStringElements(Object[] stringArray, Column[] 
columnArray) {
+    for (int i : stringTypeColumnIndices) {
+      stringArray[i] = escapeString((String) stringArray[i]);
+    }
+    for (int i : byteTypeColumnIndices) {
+      stringArray[i] = escapeByteArrays((byte[]) stringArray[i]);
     }
-    for (int i : byteFieldIndices) {
-      array[i] = escapeByteArrays((byte[]) array[i]);
+    for (int i : listTypeColumnIndices) {
+      stringArray[i] = encodeList((Object[]) stringArray[i], columnArray[i]);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private String encodeList(Object[] list, Column column) {
+    List<Object> elementList = new ArrayList<Object>();
+    for (int n = 0; n < list.length; n++) {
+      Column listType = ((AbstractComplexListType) column).getListType();
+      if (isColumnListType(listType)) {
+        Object[] listElements = (Object[]) list[n];
+        elementList.add((Arrays.deepToString(listElements)));
+      } else {
+        elementList.add(list[n]);
+      }
     }
+    JSONArray array = new JSONArray();
+    array.addAll(elementList);
+    return encloseWithQuote(array.toJSONString());
+  }
+
+  private boolean isColumnListType(Column listType) {
+    return listType.getType().equals(ColumnType.ARRAY) || 
listType.getType().equals(ColumnType.SET);
   }
 
   private String escapeByteArrays(byte[] bytes) {
     try {
-      return escapeStrings(new String(bytes, BYTE_FIELD_CHARSET));
+      return escapeString(new String(bytes, BYTE_FIELD_CHARSET));
     } catch (UnsupportedEncodingException e) {
       // We should never hit this case.
       // This character set should be distributed with Java.
-      throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The 
character set " + BYTE_FIELD_CHARSET + " is not available.");
+      throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
+          "The character set " + BYTE_FIELD_CHARSET + " is not available.");
     }
   }
 
@@ -334,7 +403,7 @@ public class CSVIntermediateDataFormat extends 
IntermediateDataFormat<String> {
     return orig.replaceAll("\\\\", Matcher.quoteReplacement("\\\\"));
   }
 
-  private String escapeStrings(String orig) {
+  private String escapeString(String orig) {
     if (orig == null) {
       return NULL_STRING;
     }
@@ -343,40 +412,52 @@ public class CSVIntermediateDataFormat extends 
IntermediateDataFormat<String> {
     String replacement = orig;
     try {
       for (j = 0; j < replacements.length; j++) {
-        replacement = replacement.replaceAll(getRegExp(originals[j]), 
Matcher.quoteReplacement(replacements[j]));
+        replacement = replacement.replaceAll(getRegExp(originals[j]),
+            Matcher.quoteReplacement(replacements[j]));
       }
     } catch (Exception e) {
-      throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0002, orig 
+ "  " + replacement + "  " + String.valueOf(j) + "  " + e.getMessage());
+      throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0002, orig
+          + "  " + replacement + "  " + String.valueOf(j) + "  " + 
e.getMessage());
     }
-    StringBuilder  builder = new StringBuilder();
-    
builder.append(QUOTE_CHARACTER).append(replacement).append(QUOTE_CHARACTER);
+    return encloseWithQuote(replacement);
+  }
+
+  private String encloseWithQuote(String string) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(QUOTE_CHARACTER).append(string).append(QUOTE_CHARACTER);
     return builder.toString();
   }
 
-  private String unescapeStrings(String orig) {
-    //Remove the trailing and starting quotes.
-    orig = orig.substring(1, orig.length() - 1);
+  private String unescapeString(String orig) {
+    // Remove the trailing and starting quotes.
+    orig = removeQuotes(orig);
     int j = 0;
     try {
       for (j = 0; j < replacements.length; j++) {
         orig = orig.replaceAll(getRegExp(replacements[j]),
-          Matcher.quoteReplacement(String.valueOf(originals[j])));
+            Matcher.quoteReplacement(String.valueOf(originals[j])));
       }
     } catch (Exception e) {
-      throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0003, orig 
+ "  " + String.valueOf(j) + e.getMessage());
+      throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0003, orig
+          + "  " + String.valueOf(j) + e.getMessage());
     }
 
     return orig;
   }
 
+  private String removeQuotes(String string) {
+    return string.substring(1, string.length() - 1);
+  }
+
   private byte[] unescapeByteArray(String orig) {
     // Always encoded in BYTE_FIELD_CHARSET.
     try {
-      return unescapeStrings(orig).getBytes(BYTE_FIELD_CHARSET);
+      return unescapeString(orig).getBytes(BYTE_FIELD_CHARSET);
     } catch (UnsupportedEncodingException e) {
       // Should never hit this case.
       // This character set should be distributed with Java.
-      throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The 
character set " + BYTE_FIELD_CHARSET + " is not available.");
+      throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
+          "The character set " + BYTE_FIELD_CHARSET + " is not available.");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/34d7066c/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
 
b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
index 5ef6fc6..253dfba 100644
--- 
a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
+++ 
b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
@@ -107,9 +107,9 @@ public abstract class IntermediateDataFormat<T> {
   public abstract void setObjectData(Object[] data);
 
   /**
-   * Set the schema for reading data.
+   * Set the schema for serializing/de-serializing  data.
    *
-   * @param schema - the schema used for reading data
+   * @param schema - the schema used for serializing/de-serializing  data
    */
   public abstract void setSchema(Schema schema);
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/34d7066c/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
 
b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
index 4d41679..665418d 100644
--- 
a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
+++ 
b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
@@ -39,7 +39,9 @@ public enum IntermediateDataFormatError implements ErrorCode {
   /** Number of fields. */
   INTERMEDIATE_DATA_FORMAT_0005("Wrong number of fields."),
 
-  INTERMEDIATE_DATA_FORMAT_0006("Schema missing.")
+  INTERMEDIATE_DATA_FORMAT_0006("Schema missing."),
+
+  INTERMEDIATE_DATA_FORMAT_0008("JSON parse internal error."),
 
   ;
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/34d7066c/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
 
b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
index fcf6c3c..b629897 100644
--- 
a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
+++ 
b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
@@ -24,7 +24,11 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.schema.Schema;
@@ -57,16 +61,10 @@ public class TestCSVIntermediateDataFormat {
     }
   }
 
-  @Test
-  public void testStringInStringOut() {
-    String testData = "10,34,'54','random data'," + getByteFieldString(new 
byte[] { (byte) -112, (byte) 54})
-      + ",'" + String.valueOf(0x0A) + "'";
-    dataFormat.setTextData(testData);
-    assertEquals(testData, dataFormat.getTextData());
-  }
+  //**************test cases for null and empty input*******************
 
   @Test
-  public void testNullStringInObjectOut() {
+  public void testNullInputAsCSVTextInObjectArrayOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new FixedPoint("1"))
         .addColumn(new FixedPoint("2"))
@@ -76,14 +74,12 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Text("6"));
     dataFormat.setSchema(schema);
     dataFormat.setTextData(null);
-
     Object[] out = dataFormat.getObjectData();
-
     assertNull(out);
   }
 
   @Test(expected=SqoopException.class)
-  public void testEmptyStringInObjectOut() {
+  public void testEmptyInputAsCSVTextInObjectArrayOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new FixedPoint("1"))
         .addColumn(new FixedPoint("2"))
@@ -93,12 +89,21 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Text("6"));
     dataFormat.setSchema(schema);
     dataFormat.setTextData("");
-
     dataFormat.getObjectData();
   }
 
+  //**************test cases for primitive types( text, number, 
bytearray)*******************
+
+  @Test
+  public void testInputAsCSVTextInCSVTextOut() {
+    String testData = "10,34,'54','random data'," + getByteFieldString(new 
byte[] { (byte) -112, (byte) 54})
+      + ",'" + String.valueOf(0x0A) + "'";
+    dataFormat.setTextData(testData);
+    assertEquals(testData, dataFormat.getTextData());
+  }
+
   @Test
-  public void testStringInObjectOut() {
+  public void testInputAsCSVTextInObjectOut() {
 
     //byte[0] = -112, byte[1] = 54 - 2's complements
     String testData = "10,34,'54','random data'," + getByteFieldString(new 
byte[] { (byte) -112, (byte) 54})
@@ -126,7 +131,7 @@ public class TestCSVIntermediateDataFormat {
   }
 
   @Test
-  public void testObjectInStringOut() {
+  public void testInputAsObjectArayInCSVTextOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new FixedPoint("1"))
         .addColumn(new FixedPoint("2"))
@@ -154,7 +159,7 @@ public class TestCSVIntermediateDataFormat {
   }
 
   @Test
-  public void testObjectInObjectOut() {
+  public void testObjectArrayInObjectArrayOut() {
     //Test escapable sequences too.
     //byte[0] = -112, byte[1] = 54 - 2's complements
     Schema schema = new Schema("test");
@@ -183,7 +188,7 @@ public class TestCSVIntermediateDataFormat {
   }
 
   @Test
-  public void testObjectWithNullInStringOut() {
+  public void testObjectArrayWithNullInCSVTextOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new FixedPoint("1"))
         .addColumn(new FixedPoint("2"))
@@ -254,6 +259,8 @@ public class TestCSVIntermediateDataFormat {
     assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData()));
   }
 
+  //**************test cases for date/datetime*******************
+
   @Test
   public void testDate() {
     Schema schema = new Schema("test");
@@ -321,6 +328,246 @@ public class TestCSVIntermediateDataFormat {
     }
   }
 
+  //**************test cases for arrays*******************
+  @Test
+  public void testArrayOfStringWithObjectArrayInObjectArrayOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new 
Text("text")));
+    schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
+    dataFormat.setSchema(schema);
+    Object[] givenArray = { "A", "B" };
+    // create an array inside the object array
+    Object[] data = new Object[2];
+    data[0] = givenArray;
+    data[1] = "text";
+    dataFormat.setObjectData(data);
+    Object[] expectedArray = (Object[]) dataFormat.getObjectData()[0];
+    assertEquals(Arrays.toString(givenArray), Arrays.toString(expectedArray));
+    assertEquals("text", dataFormat.getObjectData()[1]);
+  }
+
+  @Test
+  public void testArrayOfStringWithCSVTextInObjectArrayOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new 
Text("text")));
+    schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
+    dataFormat.setSchema(schema);
+    Object[] givenArray = { "A", "B" };
+    // create an array inside the object array
+    Object[] data = new Object[2];
+    data[0] = givenArray;
+    data[1] = "text";
+    String testData = "'[\"A\",\"B\"]','text'";
+    dataFormat.setTextData(testData);
+    Object[] expectedArray = (Object[]) dataFormat.getObjectData()[0];
+    assertEquals(Arrays.toString(givenArray), Arrays.toString(expectedArray));
+    assertEquals("text", dataFormat.getObjectData()[1]);
+  }
+
+  @Test
+  public void testArrayOfStringWithObjectArrayInCSVTextOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new 
Text("text")));
+    schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
+    dataFormat.setSchema(schema);
+    Object[] givenArray = { "A", "B" };
+    // create an array inside the object array
+    Object[] data = new Object[2];
+    data[0] = givenArray;
+    data[1] = "text";
+    String testData = "'[\"A\",\"B\"]','text'";
+    dataFormat.setObjectData(data);
+    assertEquals(testData, dataFormat.getTextData());
+  }
+
+  @Test
+  public void testArrayOfStringWithCSVTextInCSVTextOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new 
Text("text")));
+    schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
+    dataFormat.setSchema(schema);
+    String testData = "'[\"A\",\"B\"]','text'";
+    dataFormat.setTextData(testData);
+    assertEquals(testData, dataFormat.getTextData());
+  }
+
+  @Test
+  public void testArrayOfComplexStrings() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new 
Text("text")));
+    schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
+    dataFormat.setSchema(schema);
+    Object[] givenArray = { "A''\"ssss", "Bss###''" };
+    // create an array inside the object array
+    Object[] data = new Object[2];
+    data[0] = givenArray;
+    data[1] = "tex''t";
+    dataFormat.setObjectData(data);
+    Object[] expectedArray = (Object[]) dataFormat.getObjectData()[0];
+    assertEquals(Arrays.toString(givenArray), Arrays.toString(expectedArray));
+    assertEquals("tex''t", dataFormat.getObjectData()[1]);
+  }
+
+  @Test
+  public void testArrayOfIntegers() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new 
FixedPoint("fn")));
+    schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
+    dataFormat.setSchema(schema);
+    Object[] givenArray = { 1, 2 };
+    // create an array inside the object array
+    Object[] data = new Object[2];
+    data[0] = givenArray;
+    data[1] = "text";
+    dataFormat.setObjectData(data);
+    Object[] expectedArray = (Object[]) dataFormat.getObjectData()[0];
+    assertEquals(Arrays.toString(givenArray), Arrays.toString(expectedArray));
+    assertEquals("text", dataFormat.getObjectData()[1]);
+  }
+
+  @Test
+  public void testListOfIntegers() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new 
FixedPoint("fn")));
+    schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
+    dataFormat.setSchema(schema);
+    List<Integer> givenList = new ArrayList<Integer>();
+    givenList.add(1);
+    givenList.add(1);
+    // create an array inside the object array
+    Object[] data = new Object[2];
+    data[0] = givenList.toArray();
+    data[1] = "text";
+    dataFormat.setObjectData(data);
+    Object[] expectedArray = (Object[]) dataFormat.getObjectData()[0];
+    assertEquals(givenList.toString(), Arrays.toString(expectedArray));
+    assertEquals("text", dataFormat.getObjectData()[1]);
+  }
+
+  public void testSetOfIntegers() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Set("1", new 
FixedPoint("fn")));
+    schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
+    dataFormat.setSchema(schema);
+    Set<Integer> givenSet = new HashSet<Integer>();
+    givenSet.add(1);
+    givenSet.add(3);
+    // create an array inside the object array
+    Object[] data = new Object[2];
+    data[0] = givenSet.toArray();
+    data[1] = "text";
+    dataFormat.setObjectData(data);
+    Object[] expectedArray = (Object[]) dataFormat.getObjectData()[0];
+    assertEquals(givenSet.toString(), Arrays.toString(expectedArray));
+    assertEquals("text", dataFormat.getObjectData()[1]);
+  }
+
+  @Test
+  public void testArrayOfDecimals() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Array("1",
+        new org.apache.sqoop.schema.type.Decimal("deci")));
+    schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
+    dataFormat.setSchema(schema);
+    Object[] givenArray = { 1.22, 2.444 };
+    // create an array inside the object array
+    Object[] data = new Object[2];
+    data[0] = givenArray;
+    data[1] = "text";
+    dataFormat.setObjectData(data);
+    Object[] expectedArray = (Object[]) dataFormat.getObjectData()[0];
+    assertEquals(Arrays.toString(givenArray), Arrays.toString(expectedArray));
+    assertEquals("text", dataFormat.getObjectData()[1]);
+  }
+
+  @Test
+  public void testArrayOfObjectsWithObjectArrayInObjectArrayOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Array("1",
+        new org.apache.sqoop.schema.type.Array("array", new 
FixedPoint("ft"))));
+    schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
+    dataFormat.setSchema(schema);
+    Object[] givenArrayOne = { 11, 12 };
+    Object[] givenArrayTwo = { 14, 15 };
+
+    Object[] arrayOfArrays = new Object[2];
+    arrayOfArrays[0] = givenArrayOne;
+    arrayOfArrays[1] = givenArrayTwo;
+
+    // create an array inside the object array
+    Object[] data = new Object[2];
+    data[0] = arrayOfArrays;
+    data[1] = "text";
+    dataFormat.setObjectData(data);
+    Object[] expectedArray = (Object[]) dataFormat.getObjectData()[0];
+    assertEquals(2, expectedArray.length);
+    assertEquals(Arrays.deepToString(arrayOfArrays), 
Arrays.deepToString(expectedArray));
+    assertEquals("text", dataFormat.getObjectData()[1]);
+  }
+
+  @Test
+  public void testArrayOfObjectsWithCSVTextInObjectArrayOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Array("1",
+        new org.apache.sqoop.schema.type.Array("array", new 
FixedPoint("ft"))));
+    schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
+    dataFormat.setSchema(schema);
+    Object[] givenArrayOne = { 11, 12 };
+    Object[] givenArrayTwo = { 14, 15 };
+
+    Object[] arrayOfArrays = new Object[2];
+    arrayOfArrays[0] = givenArrayOne;
+    arrayOfArrays[1] = givenArrayTwo;
+
+    // create an array inside the object array
+    Object[] data = new Object[2];
+    data[0] = arrayOfArrays;
+    data[1] = "text";
+    dataFormat.setTextData("'[\"[11, 12]\",\"[14, 15]\"]','text'");
+    Object[] expectedArray = (Object[]) dataFormat.getObjectData()[0];
+    assertEquals(2, expectedArray.length);
+    assertEquals(Arrays.deepToString(arrayOfArrays), 
Arrays.deepToString(expectedArray));
+    assertEquals("text", dataFormat.getObjectData()[1]);
+  }
+
+  @Test
+  public void testArrayOfObjectsWithCSVTextInCSVTextOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Array("1",
+        new org.apache.sqoop.schema.type.Array("array", new 
FixedPoint("ft"))));
+    schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
+    dataFormat.setSchema(schema);
+    String input = "'[\"[11, 12]\",\"[14, 15]\"]','text'";
+    dataFormat.setTextData(input);
+    Object[] expectedArray = (Object[]) dataFormat.getObjectData()[0];
+    assertEquals(2, expectedArray.length);
+    assertEquals(input, dataFormat.getTextData());
+  }
+
+  @Test
+  public void testArrayOfObjectsWithObjectArrayInCSVTextOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Array("1",
+        new org.apache.sqoop.schema.type.Array("array", new 
FixedPoint("ft"))));
+    schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
+    dataFormat.setSchema(schema);
+    Object[] givenArrayOne = { 11, 12 };
+    Object[] givenArrayTwo = { 14, 15 };
+
+    Object[] arrayOfArrays = new Object[2];
+    arrayOfArrays[0] = givenArrayOne;
+    arrayOfArrays[1] = givenArrayTwo;
+
+    // create an array inside the object array
+    Object[] data = new Object[2];
+    data[0] = arrayOfArrays;
+    data[1] = "text";
+    dataFormat.setObjectData(data);
+    String expected = "'[\"[11, 12]\",\"[14, 15]\"]','text'";
+    assertEquals(expected, dataFormat.getTextData());
+  }
+
+  //**************test cases for schema*******************
   @Test(expected=SqoopException.class)
   public void testEmptySchema() {
     String testData = "10,34,'54','random data'," + getByteFieldString(new 
byte[] { (byte) -112, (byte) 54})

Reply via email to