Repository: sqoop Updated Branches: refs/heads/sqoop2 ec30437f5 -> 2d54e26a0
SQOOP-1995: Sqoop2: Allow nulls only if the column for that field has IsNullable to be true (Veena Basavaraj via Abraham Elmahrek) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/2d54e26a Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/2d54e26a Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/2d54e26a Branch: refs/heads/sqoop2 Commit: 2d54e26a086b0407afd9f22ae26e1af32b631bfe Parents: ec30437 Author: Abraham Elmahrek <[email protected]> Authored: Mon Jan 12 19:03:28 2015 -0800 Committer: Abraham Elmahrek <[email protected]> Committed: Mon Jan 12 19:03:28 2015 -0800 ---------------------------------------------------------------------- .../idf/AVROIntermediateDataFormat.java | 231 +++++++++------- .../idf/AVROIntermediateDataFormatError.java | 2 - .../idf/CSVIntermediateDataFormat.java | 58 ++-- .../connector/idf/IntermediateDataFormat.java | 2 + .../idf/IntermediateDataFormatError.java | 5 +- .../idf/JSONIntermediateDataFormat.java | 209 +++++++------- .../idf/JSONIntermediateDataFormatError.java | 2 - .../idf/TestAVROIntermediateDataFormat.java | 269 +++++++++++++++++-- .../idf/TestCSVIntermediateDataFormat.java | 52 +++- .../idf/TestJSONIntermediateDataFormat.java | 254 +++++++++++++++-- 10 files changed, 812 insertions(+), 272 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/2d54e26a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java index 67b47e7..4d68ea0 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java @@ -73,6 +73,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe */ @Override public void setCSVTextData(String text) { + super.validateSchema(schema); // convert the CSV text to avro this.data = toAVRO(text); } @@ -82,6 +83,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe */ @Override public String getCSVTextData() { + super.validateSchema(schema); // convert avro to sqoop CSV return toCSV(data); } @@ -91,6 +93,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe */ @Override public void setObjectData(Object[] data) { + super.validateSchema(schema); // convert the object array to avro this.data = toAVRO(data); } @@ -100,6 +103,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe */ @Override public Object[] getObjectData() { + super.validateSchema(schema); // convert avro to object array return toObject(data); } @@ -143,21 +147,22 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe if (csvStringArray == null) { return null; } - - if (csvStringArray.length != schema.getColumnsArray().length) { - throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + csv - + " has the wrong number of fields."); + Column[] columns = schema.getColumnsArray(); + if (csvStringArray.length != columns.length) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, + "The data " + csv + " has the wrong number of fields."); } GenericRecord avroObject = new GenericData.Record(avroSchema); - Column[] columnArray = schema.getColumnsArray(); for (int i = 0; i < csvStringArray.length; i++) { - // check for NULL field and assume this field is nullable as per the sqoop - // schema - if (csvStringArray[i].equals(NULL_VALUE) && columnArray[i].isNullable()) { - avroObject.put(columnArray[i].getName(), null); + if (csvStringArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005, + columns[i].getName() + " does not support null values"); + } + if (csvStringArray[i].equals(NULL_VALUE)) { + avroObject.put(columns[i].getName(), null); continue; } - avroObject.put(columnArray[i].getName(), toAVRO(csvStringArray[i], columnArray[i])); + avroObject.put(columns[i].getName(), toAVRO(csvStringArray[i], columns[i])); } return avroObject; } @@ -219,66 +224,80 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe return returnValue; } - private GenericRecord toAVRO(Object[] data) { + private GenericRecord toAVRO(Object[] objectArray) { - if (data == null) { + if (objectArray == null) { return null; } + Column[] columns = schema.getColumnsArray(); - if (data.length != schema.getColumnsArray().length) { - throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + data.toString() - + " has the wrong number of fields."); + if (objectArray.length != columns.length) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, + "The data " + objectArray.toString() + " has the wrong number of fields."); } // get avro schema from sqoop schema GenericRecord avroObject = new GenericData.Record(avroSchema); - Column[] cols = schema.getColumnsArray(); - for (int i = 0; i < data.length; i++) { - switch (cols[i].getType()) { + for (int i = 0; i < objectArray.length; i++) { + if (objectArray[i] == null && !columns[i].isNullable()) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005, + columns[i].getName() + " does not support null values"); + } + if (objectArray[i] == null) { + avroObject.put(columns[i].getName(), null); + continue; + } + + switch (columns[i].getType()) { case ARRAY: case SET: - avroObject.put(cols[i].getName(), toList((Object[]) data[i])); + avroObject.put(columns[i].getName(), toList((Object[]) objectArray[i])); break; case MAP: - avroObject.put(cols[i].getName(), data[i]); + avroObject.put(columns[i].getName(), objectArray[i]); break; case ENUM: - GenericData.EnumSymbol enumValue = new GenericData.EnumSymbol(createEnumSchema(cols[i]), (String) data[i]); - avroObject.put(cols[i].getName(), enumValue); + GenericData.EnumSymbol enumValue = new GenericData.EnumSymbol(createEnumSchema(columns[i]), + (String) objectArray[i]); + avroObject.put(columns[i].getName(), enumValue); break; case TEXT: - avroObject.put(cols[i].getName(), new Utf8((String) data[i])); + avroObject.put(columns[i].getName(), new Utf8((String) objectArray[i])); break; case BINARY: case UNKNOWN: - avroObject.put(cols[i].getName(), ByteBuffer.wrap((byte[]) data[i])); + avroObject.put(columns[i].getName(), ByteBuffer.wrap((byte[]) objectArray[i])); break; case FIXED_POINT: case FLOATING_POINT: - avroObject.put(cols[i].getName(), data[i]); + avroObject.put(columns[i].getName(), objectArray[i]); break; case DECIMAL: // TODO: store as FIXED in SQOOP-16161 - avroObject.put(cols[i].getName(), ((BigDecimal) data[i]).toPlainString()); + avroObject.put(columns[i].getName(), ((BigDecimal) objectArray[i]).toPlainString()); break; case DATE_TIME: - if (data[i] instanceof org.joda.time.DateTime) { - avroObject.put(cols[i].getName(), ((org.joda.time.DateTime) data[i]).toDate().getTime()); - } else if (data[i] instanceof org.joda.time.LocalDateTime) { - avroObject.put(cols[i].getName(), ((org.joda.time.LocalDateTime) data[i]).toDate().getTime()); + if (objectArray[i] instanceof org.joda.time.DateTime) { + avroObject.put(columns[i].getName(), ((org.joda.time.DateTime) objectArray[i]).toDate() + .getTime()); + } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) { + avroObject.put(columns[i].getName(), ((org.joda.time.LocalDateTime) objectArray[i]) + .toDate().getTime()); } break; case TIME: - avroObject.put(cols[i].getName(), ((org.joda.time.LocalTime) data[i]).toDateTimeToday().getMillis()); + avroObject.put(columns[i].getName(), ((org.joda.time.LocalTime) objectArray[i]) + .toDateTimeToday().getMillis()); break; case DATE: - avroObject.put(cols[i].getName(), ((org.joda.time.LocalDate) data[i]).toDate().getTime()); + avroObject.put(columns[i].getName(), ((org.joda.time.LocalDate) objectArray[i]).toDate() + .getTime()); break; case BIT: - avroObject.put(cols[i].getName(), Boolean.valueOf((Boolean) data[i])); + avroObject.put(columns[i].getName(), Boolean.valueOf((Boolean) objectArray[i])); break; default: throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, - "Column type from schema was not recognized for " + cols[i].getType()); + "Column type from schema was not recognized for " + columns[i].getType()); } } @@ -287,68 +306,72 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe @SuppressWarnings("unchecked") private String toCSV(GenericRecord record) { - Column[] cols = this.schema.getColumnsArray(); + Column[] columns = this.schema.getColumnsArray(); StringBuilder csvString = new StringBuilder(); - for (int i = 0; i < cols.length; i++) { + for (int i = 0; i < columns.length; i++) { - Object obj = record.get(cols[i].getName()); - - if (obj == null) { - throw new SqoopException(AVROIntermediateDataFormatError.AVRO_INTERMEDIATE_DATA_FORMAT_0001, " for " + cols[i].getName()); + Object obj = record.get(columns[i].getName()); + if (obj == null && !columns[i].isNullable()) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005, + columns[i].getName() + " does not support null values"); } - - switch (cols[i].getType()) { - case ARRAY: - case SET: - List<Object> objList = (List<Object>) obj; - csvString.append(toCSVList(toObjectArray(objList), cols[i])); - break; - case MAP: - Map<Object, Object> objMap = (Map<Object, Object>) obj; - csvString.append(toCSVMap(objMap, cols[i])); - break; - case ENUM: - case TEXT: - csvString.append(toCSVString(obj.toString())); - break; - case BINARY: - case UNKNOWN: - csvString.append(toCSVByteArray(getBytesFromByteBuffer(obj))); - break; - case FIXED_POINT: - csvString.append(toCSVFixedPoint(obj, cols[i])); - break; - case FLOATING_POINT: - csvString.append(toCSVFloatingPoint(obj, cols[i])); - break; - case DECIMAL: - // stored as string - csvString.append(toCSVDecimal(obj)); - break; - case DATE: - // stored as long - Long dateInMillis = (Long) obj; - csvString.append(toCSVDate(new org.joda.time.LocalDate(dateInMillis))); - break; - case TIME: - // stored as long - Long timeInMillis = (Long) obj; - csvString.append(toCSVTime(new org.joda.time.LocalTime(timeInMillis), cols[i])); - break; - case DATE_TIME: - // stored as long - Long dateTimeInMillis = (Long) obj; - csvString.append(toCSVDateTime(new org.joda.time.DateTime(dateTimeInMillis), cols[i])); - break; - case BIT: - csvString.append(toCSVBit(obj)); - break; - default: - throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, - "Column type from schema was not recognized for " + cols[i].getType()); + if (obj == null) { + csvString.append(NULL_VALUE); + } else { + + switch (columns[i].getType()) { + case ARRAY: + case SET: + List<Object> objList = (List<Object>) obj; + csvString.append(toCSVList(toObjectArray(objList), columns[i])); + break; + case MAP: + Map<Object, Object> objMap = (Map<Object, Object>) obj; + csvString.append(toCSVMap(objMap, columns[i])); + break; + case ENUM: + case TEXT: + csvString.append(toCSVString(obj.toString())); + break; + case BINARY: + case UNKNOWN: + csvString.append(toCSVByteArray(getBytesFromByteBuffer(obj))); + break; + case FIXED_POINT: + csvString.append(toCSVFixedPoint(obj, columns[i])); + break; + case FLOATING_POINT: + csvString.append(toCSVFloatingPoint(obj, columns[i])); + break; + case DECIMAL: + // stored as string + csvString.append(toCSVDecimal(obj)); + break; + case DATE: + // stored as long + Long dateInMillis = (Long) obj; + csvString.append(toCSVDate(new org.joda.time.LocalDate(dateInMillis))); + break; + case TIME: + // stored as long + Long timeInMillis = (Long) obj; + csvString.append(toCSVTime(new org.joda.time.LocalTime(timeInMillis), columns[i])); + break; + case DATE_TIME: + // stored as long + Long dateTimeInMillis = (Long) obj; + csvString.append(toCSVDateTime(new org.joda.time.DateTime(dateTimeInMillis), columns[i])); + break; + case BIT: + csvString.append(toCSVBit(obj)); + break; + default: + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, + "Column type from schema was not recognized for " + columns[i].getType()); + } } - if (i < cols.length - 1) { + if (i < columns.length - 1) { csvString.append(CSV_SEPARATOR_CHARACTER); } @@ -360,19 +383,25 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe @SuppressWarnings("unchecked") private Object[] toObject(GenericRecord record) { - if (data == null) { + if (record == null) { return null; } - Column[] cols = schema.getColumnsArray(); - Object[] object = new Object[cols.length]; - - for (int i = 0; i < cols.length; i++) { - Object obj = record.get(cols[i].getName()); + Column[] columns = schema.getColumnsArray(); + Object[] object = new Object[columns.length]; + + for (int i = 0; i < columns.length; i++) { + Object obj = record.get(columns[i].getName()); + Integer nameIndex = schema.getColumnNameIndex(columns[i].getName()); + Column column = columns[nameIndex]; + // null is a possible value + if (obj == null && !column.isNullable()) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005, + column.getName() + " does not support null values"); + } if (obj == null) { - throw new SqoopException(AVROIntermediateDataFormatError.AVRO_INTERMEDIATE_DATA_FORMAT_0001, " for " + cols[i].getName()); + object[nameIndex] = null; + continue; } - Integer nameIndex = schema.getColumnNameIndex(cols[i].getName()); - Column column = cols[nameIndex]; switch (column.getType()) { case ARRAY: case SET: @@ -422,7 +451,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe break; default: throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, - "Column type from schema was not recognized for " + cols[i].getType()); + "Column type from schema was not recognized for " + column.getType()); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/2d54e26a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormatError.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormatError.java index 6af21a3..3dcbf4a 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormatError.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormatError.java @@ -25,8 +25,6 @@ public enum AVROIntermediateDataFormatError implements ErrorCode { /** An unknown error has occurred. */ AVRO_INTERMEDIATE_DATA_FORMAT_0000("An unknown error has occurred."), - AVRO_INTERMEDIATE_DATA_FORMAT_0001("Missing key in the AVRO object.") - ; private final String message; http://git-wip-us.apache.org/repos/asf/sqoop/blob/2d54e26a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java index 856a4bb..2af6acd 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java @@ -58,7 +58,8 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { */ @Override public String getCSVTextData() { - return data; + // TODO:SQOOP-1936 to enable schema validation after we use compareTo + return this.data; } /** @@ -66,7 +67,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { */ @Override public void setCSVTextData(String csvText) { - this.data = csvText; + super.setData(csvText); } /** @@ -87,13 +88,17 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { } Object[] objectArray = new Object[csvStringArray.length]; - Column[] columnArray = schema.getColumnsArray(); + Column[] columns = schema.getColumnsArray(); for (int i = 0; i < csvStringArray.length; i++) { + if (csvStringArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005, + columns[i].getName() + " does not support null values"); + } if (csvStringArray[i].equals(NULL_VALUE)) { objectArray[i] = null; continue; } - objectArray[i] = toObject(csvStringArray[i], columnArray[i]); + objectArray[i] = toObject(csvStringArray[i], columns[i]); } return objectArray; } @@ -183,65 +188,68 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { @SuppressWarnings("unchecked") private String toCSV(Object[] objectArray) { - Column[] columnArray = schema.getColumnsArray(); + Column[] columns = schema.getColumnsArray(); StringBuilder csvString = new StringBuilder(); - for (int i = 0; i < columnArray.length; i++) { - Object obj = objectArray[i]; - if (obj == null) { + for (int i = 0; i < columns.length; i++) { + if (objectArray[i] == null && !columns[i].isNullable()) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005, + columns[i].getName() + " does not support null values"); + } + if (objectArray[i] == null) { csvString.append(NULL_VALUE); } else { - switch (columnArray[i].getType()) { + switch (columns[i].getType()) { case ARRAY: case SET: - csvString.append(toCSVList((Object[]) obj, (AbstractComplexListType) columnArray[i])); + csvString.append(toCSVList((Object[]) objectArray[i], (AbstractComplexListType) columns[i])); break; case MAP: - csvString.append(toCSVMap((Map<Object, Object>) obj, columnArray[i])); + csvString.append(toCSVMap((Map<Object, Object>) objectArray[i], columns[i])); break; case ENUM: case TEXT: - csvString.append(toCSVString(obj.toString())); + csvString.append(toCSVString(objectArray[i].toString())); break; case BINARY: case UNKNOWN: - csvString.append(toCSVByteArray((byte[]) obj)); + csvString.append(toCSVByteArray((byte[]) objectArray[i])); break; case FIXED_POINT: - csvString.append(toCSVFixedPoint(obj, columnArray[i])); + csvString.append(toCSVFixedPoint(objectArray[i], columns[i])); break; case FLOATING_POINT: - csvString.append(toCSVFloatingPoint(obj, columnArray[i])); + csvString.append(toCSVFloatingPoint(objectArray[i], columns[i])); break; case DECIMAL: - csvString.append(toCSVDecimal(obj)); + csvString.append(toCSVDecimal(objectArray[i])); break; // stored in JSON as strings in the joda time format case DATE: - csvString.append(toCSVDate(obj)); + csvString.append(toCSVDate(objectArray[i])); break; case TIME: - csvString.append(toCSVTime(obj, columnArray[i])); + csvString.append(toCSVTime(objectArray[i], columns[i])); break; case DATE_TIME: if (objectArray[i] instanceof org.joda.time.DateTime) { - org.joda.time.DateTime dateTime = (org.joda.time.DateTime) obj; + org.joda.time.DateTime dateTime = (org.joda.time.DateTime) objectArray[i]; // check for fraction and time zone and then use the right formatter - csvString.append(toCSVDateTime(dateTime, columnArray[i])); + csvString.append(toCSVDateTime(dateTime, columns[i])); } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) { - org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) obj; - csvString.append(toCSVLocalDateTime(localDateTime, columnArray[i])); + org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) objectArray[i]; + csvString.append(toCSVLocalDateTime(localDateTime, columns[i])); } break; case BIT: - csvString.append(toCSVBit(obj)); + csvString.append(toCSVBit(objectArray[i])); break; default: throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, - "Column type from schema was not recognized for " + columnArray[i].getType()); + "Column type from schema was not recognized for " + columns[i].getType()); } } - if (i < columnArray.length - 1) { + if (i < columns.length - 1) { csvString.append(CSV_SEPARATOR_CHARACTER); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/2d54e26a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java index 261a462..6063320 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java @@ -60,6 +60,7 @@ public abstract class IntermediateDataFormat<T> { * the intermediate data format implementation. */ public T getData() { + validateSchema(schema); return data; } @@ -70,6 +71,7 @@ public abstract class IntermediateDataFormat<T> { * @param obj - A single row of data to be moved. */ public void setData(T obj) { + validateSchema(schema); this.data = obj; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/2d54e26a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java index 1f583b2..f4e1fb7 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java @@ -28,13 +28,16 @@ public enum IntermediateDataFormatError implements ErrorCode { INTERMEDIATE_DATA_FORMAT_0001("Wrong number of columns."), /** Schema is missing in the IDF. */ - INTERMEDIATE_DATA_FORMAT_0002("Schema missing."), + INTERMEDIATE_DATA_FORMAT_0002("Schema is null."), INTERMEDIATE_DATA_FORMAT_0003("JSON parse error"), /** Column type isn't known by Intermediate Data Format. */ INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."), + /** Column value cannot be null. */ + INTERMEDIATE_DATA_FORMAT_0005("Column value cannot be null"), + ; private final String message; http://git-wip-us.apache.org/repos/asf/sqoop/blob/2d54e26a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java index b937d87..3cfd356 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java @@ -55,6 +55,7 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec */ @Override public void setCSVTextData(String text) { + super.validateSchema(schema); // convert the CSV text to JSON this.data = toJSON(text); } @@ -64,6 +65,7 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec */ @Override public String getCSVTextData() { + super.validateSchema(schema); // convert JSON to sqoop CSV return toCSV(data); } @@ -73,6 +75,7 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec */ @Override public void setObjectData(Object[] data) { + super.validateSchema(schema); // convert the object Array to JSON this.data = toJSON(data); } @@ -82,6 +85,7 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec */ @Override public Object[] getObjectData() { + super.validateSchema(schema); // convert JSON to object array return toObject(data); } @@ -126,20 +130,24 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec if (csvStringArray == null) { return null; } + Column[] columns = schema.getColumnsArray(); - if (csvStringArray.length != schema.getColumnsArray().length) { + if (csvStringArray.length != columns.length) { throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + csv + " has the wrong number of fields."); } JSONObject object = new JSONObject(); - Column[] columnArray = schema.getColumnsArray(); for (int i = 0; i < csvStringArray.length; i++) { + if (csvStringArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005, + columns[i].getName() + " does not support null values"); + } // check for NULL field and bail out immediately if (csvStringArray[i].equals(NULL_VALUE)) { - object.put(columnArray[i].getName(), null); + object.put(columns[i].getName(), null); continue; } - object.put(columnArray[i].getName(), toJSON(csvStringArray[i], columnArray[i])); + object.put(columns[i].getName(), toJSON(csvStringArray[i], columns[i])); } return object; @@ -161,7 +169,7 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec try { returnValue = (JSONObject) new JSONParser().parse(removeQuotes(csvString)); } catch (ParseException e) { - throw new SqoopException(JSONIntermediateDataFormatError.JSON_INTERMEDIATE_DATA_FORMAT_0003, e); + throw new SqoopException(JSONIntermediateDataFormatError.JSON_INTERMEDIATE_DATA_FORMAT_0002, e); } break; case ENUM: @@ -201,128 +209,138 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec } @SuppressWarnings("unchecked") - private JSONObject toJSON(Object[] data) { + private JSONObject toJSON(Object[] objectArray) { - if (data == null) { + if (objectArray == null) { return null; } + Column[] columns = schema.getColumnsArray(); - if (data.length != schema.getColumnsArray().length) { - throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + data.toString() + if (objectArray.length != columns.length) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + objectArray.toString() + " has the wrong number of fields."); } - JSONObject object = new JSONObject(); - Column[] cols = schema.getColumnsArray(); - for (int i = 0; i < data.length; i++) { - switch (cols[i].getType()) { + JSONObject json = new JSONObject(); + for (int i = 0; i < objectArray.length; i++) { + if (objectArray[i] == null && !columns[i].isNullable()) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005, + columns[i].getName() + " does not support null values"); + } + if (objectArray[i] == null) { + json.put(columns[i].getName(), null); + continue; + } + switch (columns[i].getType()) { case ARRAY: case SET: // store as JSON array - Object[] objArray = (Object[]) data[i]; + Object[] objArray = (Object[]) objectArray[i]; JSONArray jsonArray = toJSONArray(objArray); - object.put(cols[i].getName(), jsonArray); + json.put(columns[i].getName(), jsonArray); break; case MAP: // store as JSON object - Map<Object, Object> map = (Map<Object, Object>) data[i]; + Map<Object, Object> map = (Map<Object, Object>) objectArray[i]; JSONObject jsonObject = new JSONObject(); jsonObject.putAll(map); - object.put(cols[i].getName(), jsonObject); + json.put(columns[i].getName(), jsonObject); break; case ENUM: case TEXT: - object.put(cols[i].getName(), data[i]); + json.put(columns[i].getName(), objectArray[i]); break; case BINARY: case UNKNOWN: - object.put(cols[i].getName(), Base64.encodeBase64String((byte[]) data[i])); + json.put(columns[i].getName(), Base64.encodeBase64String((byte[]) objectArray[i])); break; case FIXED_POINT: case FLOATING_POINT: case DECIMAL: // store a object - object.put(cols[i].getName(), data[i]); + json.put(columns[i].getName(), objectArray[i]); break; // stored in JSON as the same format as csv strings in the joda time // format case DATE_TIME: - object.put(cols[i].getName(), removeQuotes(toCSVDateTime(data[i], cols[i]))); + json.put(columns[i].getName(), removeQuotes(toCSVDateTime(objectArray[i], columns[i]))); break; case TIME: - object.put(cols[i].getName(), removeQuotes(toCSVTime(data[i], cols[i]))); + json.put(columns[i].getName(), removeQuotes(toCSVTime(objectArray[i], columns[i]))); break; case DATE: - object.put(cols[i].getName(), removeQuotes(toCSVDate(data[i]))); + json.put(columns[i].getName(), removeQuotes(toCSVDate(objectArray[i]))); break; case BIT: - object.put(cols[i].getName(), Boolean.valueOf(toCSVBit(data[i]))); + json.put(columns[i].getName(), Boolean.valueOf(toCSVBit(objectArray[i]))); break; default: throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, - "Column type from schema was not recognized for " + cols[i].getType()); + "Column type from schema was not recognized for " + columns[i].getType()); } } - return object; + return json; } private String toCSV(JSONObject json) { - Column[] cols = this.schema.getColumnsArray(); + Column[] columns = this.schema.getColumnsArray(); StringBuilder csvString = new StringBuilder(); - for (int i = 0; i < cols.length; i++) { - - // or we can to json.entrySet(); - Object obj = json.get(cols[i].getName()); - if (obj == null) { - throw new SqoopException(JSONIntermediateDataFormatError.JSON_INTERMEDIATE_DATA_FORMAT_0003, " for " + cols[i].getName()); + for (int i = 0; i < columns.length; i++) { + Object obj = json.get(columns[i].getName()); + if (obj == null && !columns[i].isNullable()) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005, + columns[i].getName() + " does not support null values"); } - - switch (cols[i].getType()) { - case ARRAY: - case SET: - // stored as JSON array - JSONArray array = (JSONArray) obj; - csvString.append(encloseWithQuotes(array.toJSONString())); - break; - case MAP: - // stored as JSON object - csvString.append(encloseWithQuotes((((JSONObject) obj).toJSONString()))); - break; - case ENUM: - case TEXT: - csvString.append(toCSVString(obj.toString())); - break; - case BINARY: - case UNKNOWN: - csvString.append(toCSVByteArray(Base64.decodeBase64(obj.toString()))); - break; - case FIXED_POINT: - csvString.append(toCSVFixedPoint(obj, cols[i])); - break; - case FLOATING_POINT: - csvString.append(toCSVFloatingPoint(obj, cols[i])); - break; - case DECIMAL: - csvString.append(toCSVDecimal(obj)); - break; - // stored in JSON as strings in the joda time format - case DATE: - case TIME: - case DATE_TIME: - csvString.append(encloseWithQuotes(obj.toString())); - break; - // 0/1 will be stored as they are in JSON, even though valid values in - // JSON - // are true/false - case BIT: - csvString.append(toCSVBit(obj)); - break; - default: - throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, - "Column type from schema was not recognized for " + cols[i].getType()); + if (obj == null) { + csvString.append(NULL_VALUE); + } else { + switch (columns[i].getType()) { + case ARRAY: + case SET: + // stored as JSON array + JSONArray array = (JSONArray) obj; + csvString.append(encloseWithQuotes(array.toJSONString())); + break; + case MAP: + // stored as JSON object + csvString.append(encloseWithQuotes((((JSONObject) obj).toJSONString()))); + break; + case ENUM: + case TEXT: + csvString.append(toCSVString(obj.toString())); + break; + case BINARY: + case UNKNOWN: + csvString.append(toCSVByteArray(Base64.decodeBase64(obj.toString()))); + break; + case FIXED_POINT: + csvString.append(toCSVFixedPoint(obj, columns[i])); + break; + case FLOATING_POINT: + csvString.append(toCSVFloatingPoint(obj, columns[i])); + break; + case DECIMAL: + csvString.append(toCSVDecimal(obj)); + break; + // stored in JSON as strings in the joda time format + case DATE: + case TIME: + case DATE_TIME: + csvString.append(encloseWithQuotes(obj.toString())); + break; + // 0/1 will be stored as they are in JSON, even though valid values in + // JSON + // are true/false + case BIT: + csvString.append(toCSVBit(obj)); + break; + default: + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, + "Column type from schema was not recognized for " + columns[i].getType()); + } } - if (i < cols.length - 1) { + if (i < columns.length - 1) { csvString.append(CSV_SEPARATOR_CHARACTER); } @@ -332,21 +350,29 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec } @SuppressWarnings("unchecked") - private Object[] toObject(JSONObject data) { + private Object[] toObject(JSONObject json) { - if (data == null) { + if (json == null) { return null; } - Column[] cols = schema.getColumnsArray(); - Object[] object = new Object[cols.length]; - - Set<String> keys = data.keySet(); - int i = 0; - for (String key : keys) { - - Integer nameIndex = schema.getColumnNameIndex(key); - Object obj = data.get(key); - Column column = cols[nameIndex]; + Column[] columns = schema.getColumnsArray(); + Object[] object = new Object[columns.length]; + + Set<String> jsonKeyNames = json.keySet(); + for (String name : jsonKeyNames) { + Integer nameIndex = schema.getColumnNameIndex(name); + Column column = columns[nameIndex]; + + Object obj = json.get(name); + // null is a possible value + if (obj == null && !column.isNullable()) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005, + column.getName() + " does not support null values"); + } + if (obj == null) { + object[nameIndex] = null; + continue; + } switch (column.getType()) { case ARRAY: case SET: @@ -387,9 +413,8 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec break; default: throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, - "Column type from schema was not recognized for " + cols[i].getType()); + "Column type from schema was not recognized for " + column.getType()); } - } return object; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/2d54e26a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormatError.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormatError.java index 86c71b6..72d9d87 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormatError.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormatError.java @@ -29,8 +29,6 @@ public enum JSONIntermediateDataFormatError implements ErrorCode { JSON_INTERMEDIATE_DATA_FORMAT_0002("JSON object parse error."), - JSON_INTERMEDIATE_DATA_FORMAT_0003("Missing key in the JSON object."), - ; private final String message; http://git-wip-us.apache.org/repos/asf/sqoop/blob/2d54e26a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java index 816cc71..5c7b444 100644 --- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java @@ -19,12 +19,16 @@ package org.apache.sqoop.connector.idf; import static org.apache.sqoop.connector.common.SqoopAvroUtils.createEnumSchema; +import static org.apache.sqoop.connector.common.SqoopIDFUtils.NULL_VALUE; import static org.apache.sqoop.connector.common.TestSqoopIDFUtils.getByteFieldString; +import static org.testng.Assert.assertNull; import static org.testng.AssertJUnit.assertEquals; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.util.Utf8; +import org.apache.commons.lang.StringUtils; +import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.common.SqoopAvroUtils; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.type.Array; @@ -58,7 +62,8 @@ public class TestAVROIntermediateDataFormat { private final static String csvTime = "'12:59:59'"; private Column enumCol; // no time zone - private final static LocalDateTime dateTime = new org.joda.time.LocalDateTime(2014, 10, 01, 12, 0, 0); + private final static LocalDateTime dateTime = new org.joda.time.LocalDateTime(2014, 10, 01, 12, + 0, 0); private final static org.joda.time.LocalTime time = new org.joda.time.LocalTime(12, 59, 59); private final static org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01); @@ -73,15 +78,23 @@ public class TestAVROIntermediateDataFormat { options.add("ENUM"); options.add("NUME"); enumCol = new org.apache.sqoop.schema.type.Enum("seven").setOptions(options); - sqoopSchema.addColumn(new FixedPoint("one")).addColumn(new FixedPoint("two", 2L, false)).addColumn(new Text("three")) - .addColumn(new Text("four")).addColumn(new Binary("five")).addColumn(new Text("six")).addColumn(enumCol) + sqoopSchema + .addColumn(new FixedPoint("one")) + .addColumn(new FixedPoint("two", 2L, false)) + .addColumn(new Text("three")) + .addColumn(new Text("four")) + .addColumn(new Binary("five")) + .addColumn(new Text("six")) + .addColumn(enumCol) .addColumn(new Array("eight", new Array("array", new FixedPoint("ft")))) - .addColumn(new org.apache.sqoop.schema.type.Map("nine", new Text("t1"), new Text("t2"))).addColumn(new Bit("ten")) + .addColumn(new org.apache.sqoop.schema.type.Map("nine", new Text("t1"), new Text("t2"))) + .addColumn(new Bit("ten")) .addColumn(new org.apache.sqoop.schema.type.DateTime("eleven", true, false)) .addColumn(new org.apache.sqoop.schema.type.Time("twelve", false)) .addColumn(new org.apache.sqoop.schema.type.Date("thirteen")) .addColumn(new org.apache.sqoop.schema.type.FloatingPoint("fourteen")) - .addColumn(new org.apache.sqoop.schema.type.Set("fifteen", new Array("set", new FixedPoint("ftw")))); + .addColumn( + new org.apache.sqoop.schema.type.Set("fifteen", new Array("set", new FixedPoint("ftw")))); dataFormat = new AVROIntermediateDataFormat(sqoopSchema); avroSchema = SqoopAvroUtils.createAvroSchema(sqoopSchema); } @@ -92,9 +105,10 @@ public class TestAVROIntermediateDataFormat { @Test public void testInputAsCSVTextInAndDataOut() { - String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" - + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate - + ",13.44," + csvSet; + String csvText = "10,34,'54','random data'," + + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A) + + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + + csvDate + ",13.44," + csvSet; dataFormat.setCSVTextData(csvText); GenericRecord avroObject = createAvroGenericRecord(); assertEquals(avroObject.toString(), dataFormat.getData().toString()); @@ -102,9 +116,10 @@ public class TestAVROIntermediateDataFormat { @Test public void testInputAsCSVTextInAndObjectArrayOut() { - String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" - + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate - + ",13.44," + csvSet; + String csvText = "10,34,'54','random data'," + + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A) + + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + + csvDate + ",13.44," + csvSet; dataFormat.setCSVTextData(csvText); assertEquals(dataFormat.getObjectData().length, 15); assertObjectArray(); @@ -158,9 +173,10 @@ public class TestAVROIntermediateDataFormat { @Test public void testInputAsCSVTextInCSVTextOut() { - String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" - + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate - + ",13.44," + csvSet; + String csvText = "10,34,'54','random data'," + + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A) + + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + + csvDate + ",13.44," + csvSet; dataFormat.setCSVTextData(csvText); assertEquals(csvText, dataFormat.getCSVTextData()); } @@ -219,9 +235,10 @@ public class TestAVROIntermediateDataFormat { @Test public void testInputAsDataInAndCSVOut() { - String csvExpected = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" - + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate - + ",13.44," + csvSet; + String csvExpected = "10,34,'54','random data'," + + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A) + + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + + csvDate + ",13.44," + csvSet; dataFormat.setData(createAvroGenericRecord()); assertEquals(csvExpected, dataFormat.getCSVTextData()); } @@ -295,7 +312,8 @@ public class TestAVROIntermediateDataFormat { Object[] out = createObjectArray(); dataFormat.setObjectData(out); GenericRecord avroObject = createAvroGenericRecord(); - // SQOOP-SQOOP-1975: direct object compare will fail unless we use the Avro complex types + // SQOOP-SQOOP-1975: direct object compare will fail unless we use the Avro + // complex types assertEquals(avroObject.toString(), dataFormat.getData().toString()); } @@ -304,9 +322,10 @@ public class TestAVROIntermediateDataFormat { public void testInputAsObjectArrayInAndCSVOut() { Object[] out = createObjectArray(); dataFormat.setObjectData(out); - String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" - + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate - + ",13.44," + csvSet; + String csvText = "10,34,'54','random data'," + + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A) + + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + + csvDate + ",13.44," + csvSet; assertEquals(csvText, dataFormat.getCSVTextData()); } @@ -316,4 +335,212 @@ public class TestAVROIntermediateDataFormat { dataFormat.setObjectData(out); assertObjectArray(); } + + // **************test cases for empty and null schema******************* + @Test(expectedExceptions = SqoopException.class) + public void testEmptySchema() { + String testData = "10,34,'54','random data'," + + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'\\n'"; + // no coumns + Schema schema = new Schema("Test"); + dataFormat = new AVROIntermediateDataFormat(schema); + dataFormat.setCSVTextData(testData); + + @SuppressWarnings("unused") + Object[] out = dataFormat.getObjectData(); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNullSchema() { + dataFormat = new AVROIntermediateDataFormat(null); + dataFormat.getObjectData(); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNotSettingSchemaAndGetObjectData() { + dataFormat = new AVROIntermediateDataFormat(); + dataFormat.getObjectData(); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNotSettingSchemaAndGetData() { + dataFormat = new AVROIntermediateDataFormat(); + dataFormat.getData(); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNotSettingSchemaAndGetCSVData() { + dataFormat = new AVROIntermediateDataFormat(); + dataFormat.getCSVTextData(); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNotSettingSchemaAndSetObjectData() { + dataFormat = new AVROIntermediateDataFormat(); + dataFormat.setObjectData(null); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNotSettingSchemaAndSetData() { + dataFormat = new AVROIntermediateDataFormat(); + dataFormat.setData(null); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNotSettingSchemaAndSetCSVData() { + dataFormat = new AVROIntermediateDataFormat(); + dataFormat.setCSVTextData(null); + } + + // **************test cases for null and empty input******************* + + @Test + public void testNullInputAsCSVTextInObjectArrayOut() { + + dataFormat.setCSVTextData(null); + Object[] out = dataFormat.getObjectData(); + assertNull(out); + } + + @Test(expectedExceptions = SqoopException.class) + public void testEmptyInputAsCSVTextInObjectArrayOut() { + dataFormat.setCSVTextData(""); + dataFormat.getObjectData(); + } + + @Test + public void testNullValueAsObjectArrayInAndCSVTextOut() { + + Object[] in = { null, null, null, null, null, null, null, null, null, null, null, null, null, + null, null }; + dataFormat.setObjectData(in); + + String csvText = dataFormat.getCSVTextData(); + String[] textValues = csvText.split(","); + assertEquals(15, textValues.length); + for (String text : textValues) { + assertEquals(text, NULL_VALUE); + } + } + + @Test + public void testNullValueAsObjectArrayInAndObjectArrayOut() { + Object[] in = { null, null, null, null, null, null, null, null, null, null, null, null, null, + null, null }; + dataFormat.setObjectData(in); + + Object[] out = dataFormat.getObjectData(); + assertEquals(15, out.length); + for (Object obj : out) { + assertEquals(obj, null); + } + } + + @Test + public void testNullValueAsCSVTextInAndObjectArrayOut() { + String[] test = { "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", + "NULL", "NULL", "NULL", "NULL", "NULL", "NULL" }; + dataFormat.setCSVTextData(StringUtils.join(test, ",")); + Object[] out = dataFormat.getObjectData(); + assertEquals(15, out.length); + for (Object obj : out) { + assertEquals(obj, null); + } + } + + @Test + public void testNullValueAsCSVTextInAndCSVTextOut() { + + String[] test = { "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", + "NULL", "NULL", "NULL", "NULL", "NULL", "NULL" }; + dataFormat.setCSVTextData(StringUtils.join(test, ",")); + + String csvText = dataFormat.getCSVTextData(); + String[] textValues = csvText.split(","); + assertEquals(15, textValues.length); + for (String text : textValues) { + assertEquals(text, NULL_VALUE); + } + } + + @Test + public void testNullValueAsDataInAndCSVTextOut() { + + GenericRecord avroObject = new GenericData.Record(avroSchema); + avroObject = setAvroRecordWithNulls(); + dataFormat.setData(avroObject); + + String csvText = dataFormat.getCSVTextData(); + String[] textValues = csvText.split(","); + assertEquals(15, textValues.length); + for (String text : textValues) { + assertEquals(text, NULL_VALUE); + } + } + + @Test + public void testNullValueAsDataInAndObjectArrayOut() { + GenericRecord avroObject = new GenericData.Record(avroSchema); + avroObject = setAvroRecordWithNulls(); + dataFormat.setData(avroObject); + + Object[] out = dataFormat.getObjectData(); + assertEquals(15, out.length); + for (Object obj : out) { + assertEquals(obj, null); + } + + } + + private GenericRecord setAvroRecordWithNulls() { + GenericRecord avroObject = new GenericData.Record(avroSchema); + avroObject.put("one", null); + avroObject.put("two", null); + avroObject.put("three", null); + avroObject.put("four", null); + avroObject.put("five", null); + avroObject.put("six", null); + avroObject.put("seven", null); + + avroObject.put("eight", null); + avroObject.put("nine", null); + avroObject.put("ten", null); + + // expect dates as strings + avroObject.put("eleven", null); + avroObject.put("twelve", null); + avroObject.put("thirteen", null); + avroObject.put("fourteen", null); + + avroObject.put("fifteen", null); + return avroObject; + } + @Test(expectedExceptions = SqoopException.class) + public void testSchemaNotNullableWithObjectArray() { + Schema overrideSchema = new Schema("Test").addColumn(new Text("t").setNullable(false)); + AVROIntermediateDataFormat dataFormat = new AVROIntermediateDataFormat(overrideSchema); + Object[] out = new Object[1]; + out[0] = null; + dataFormat.setObjectData(out); + } + + @Test(expectedExceptions = SqoopException.class) + public void testSchemaNotNullableWithCSV() { + Schema overrideSchema = new Schema("Test").addColumn(new Text("one").setNullable(false)); + AVROIntermediateDataFormat dataFormat = new AVROIntermediateDataFormat(overrideSchema); + dataFormat.setCSVTextData(NULL_VALUE); + } + + // no validation happens when the setAvro and getAvro is used + @Test + public void testSchemaNotNullableWithAvro() { + Schema overrideSchema = new Schema("Test").addColumn(new Text("one").setNullable(false)); + AVROIntermediateDataFormat dataFormat = new AVROIntermediateDataFormat(overrideSchema); + org.apache.avro.Schema avroSchema = SqoopAvroUtils.createAvroSchema(overrideSchema); + GenericRecord avroObject = new GenericData.Record(avroSchema); + avroObject.put("one", null); + dataFormat.setData(avroObject); + dataFormat.getData(); + } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/2d54e26a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java index fca410f..861d34e 100644 --- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java @@ -58,7 +58,7 @@ public class TestCSVIntermediateDataFormat { } - //**************test cases for null and empty input******************* + //**************test cases for null input******************* @Test public void testNullInputAsCSVTextInObjectArrayOut() { @@ -1112,7 +1112,7 @@ public class TestCSVIntermediateDataFormat { dataFormat.setCSVTextData(testData); assertEquals(testData, dataFormat.getCSVTextData()); } - //**************test cases for schema******************* + //**************test cases for null and empty schema******************* @Test(expectedExceptions=SqoopException.class) public void testEmptySchema() { String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) @@ -1128,14 +1128,52 @@ public class TestCSVIntermediateDataFormat { @Test(expectedExceptions = SqoopException.class) public void testNullSchema() { dataFormat = new CSVIntermediateDataFormat(null); - @SuppressWarnings("unused") - Object[] out = dataFormat.getObjectData(); + dataFormat.getObjectData(); } @Test(expectedExceptions = SqoopException.class) - public void testNotSettingSchema() { + public void testNotSettingSchemaAndGetObjectData() { dataFormat = new CSVIntermediateDataFormat(); - @SuppressWarnings("unused") - Object[] out = dataFormat.getObjectData(); + dataFormat.getObjectData(); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNotSettingSchemaAndGetData() { + dataFormat = new CSVIntermediateDataFormat(); + dataFormat.getData(); + } + + //SQOOP-1936 to enable schema validation after we use compareTo + @Test + public void testNotSettingSchemaAndGetCSVData() { + dataFormat = new CSVIntermediateDataFormat(); + dataFormat.getCSVTextData(); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNotSettingSchemaAndSetObjectData() { + dataFormat = new CSVIntermediateDataFormat(); + dataFormat.setObjectData(null); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNotSettingSchemaAndSetData() { + dataFormat = new CSVIntermediateDataFormat(); + dataFormat.setData(null); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNotSettingSchemaAndSetCSVData() { + dataFormat = new CSVIntermediateDataFormat(); + dataFormat.setCSVTextData(null); + } + + @Test(expectedExceptions = SqoopException.class) + public void testSchemaNotNullable() { + dataFormat = new CSVIntermediateDataFormat(); + dataFormat.setSchema(new Schema("Test").addColumn(new Text("t").setNullable(false))); + Object[] out = new Object[1]; + out[0] = null; + dataFormat.setObjectData(out); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/2d54e26a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java index 44ea4fd..8a87c65 100644 --- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java @@ -18,9 +18,13 @@ */ package org.apache.sqoop.connector.idf; +import static org.apache.sqoop.connector.common.SqoopIDFUtils.NULL_VALUE; import static org.apache.sqoop.connector.common.TestSqoopIDFUtils.getByteFieldString; +import static org.testng.Assert.assertNull; import static org.testng.AssertJUnit.assertEquals; +import org.apache.commons.lang.StringUtils; +import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.common.SqoopIDFUtils; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.type.Array; @@ -55,15 +59,23 @@ public class TestJSONIntermediateDataFormat { private void createJSONIDF() { Schema schema = new Schema("test"); - schema.addColumn(new FixedPoint("1")).addColumn(new FixedPoint("2", 2L, false)).addColumn(new Text("3")) - .addColumn(new Text("4")).addColumn(new Binary("5")).addColumn(new Text("6")) + schema + .addColumn(new FixedPoint("1")) + .addColumn(new FixedPoint("2", 2L, false)) + .addColumn(new Text("3")) + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")) .addColumn(new org.apache.sqoop.schema.type.Enum("7")) .addColumn(new Array("8", new Array("array", new FixedPoint("ft")))) - .addColumn(new org.apache.sqoop.schema.type.Map("9", new Text("t1"), new Text("t2"))).addColumn(new Bit("10")) + .addColumn(new org.apache.sqoop.schema.type.Map("9", new Text("t1"), new Text("t2"))) + .addColumn(new Bit("10")) .addColumn(new org.apache.sqoop.schema.type.DateTime("11", true, false)) - .addColumn(new org.apache.sqoop.schema.type.Time("12", false)).addColumn(new org.apache.sqoop.schema.type.Date("13")) + .addColumn(new org.apache.sqoop.schema.type.Time("12", false)) + .addColumn(new org.apache.sqoop.schema.type.Date("13")) .addColumn(new org.apache.sqoop.schema.type.FloatingPoint("14")) - .addColumn(new org.apache.sqoop.schema.type.Set("15", new Array("set", new FixedPoint("ftw")))); + .addColumn( + new org.apache.sqoop.schema.type.Set("15", new Array("set", new FixedPoint("ftw")))); dataFormat = new JSONIntermediateDataFormat(schema); } @@ -73,9 +85,10 @@ public class TestJSONIntermediateDataFormat { @Test public void testInputAsCSVTextInAndDataOut() { - String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" - + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + ",13.44," - + csvSet; + String csvText = "10,34,'54','random data'," + + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A) + + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + + ",13.44," + csvSet; dataFormat.setCSVTextData(csvText); String jsonExpected = "{\"15\":[[11,12],[14,15]],\"13\":\"2014-10-01\",\"14\":13.44,\"11\":\"2014-10-01 12:00:00.000\"," + "\"12\":\"12:59:59\",\"3\":\"54\",\"2\":34,\"1\":10,\"10\":true,\"7\":\"ENUM\",\"6\":\"10\",\"5\":\"kDY=\",\"4\":\"random data\"," @@ -85,9 +98,10 @@ public class TestJSONIntermediateDataFormat { @Test public void testInputAsCSVTextInAndObjectArrayOut() { - String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" - + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + ",13.44," - + csvSet; + String csvText = "10,34,'54','random data'," + + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A) + + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + + ",13.44," + csvSet; dataFormat.setCSVTextData(csvText); assertEquals(dataFormat.getObjectData().length, 15); assertObjectArray(); @@ -145,9 +159,10 @@ public class TestJSONIntermediateDataFormat { @Test public void testInputAsCSVTextInCSVTextOut() { - String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" - + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + ",13.44," - + csvSet; + String csvText = "10,34,'54','random data'," + + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A) + + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + + ",13.44," + csvSet; dataFormat.setCSVTextData(csvText); assertEquals(csvText, dataFormat.getCSVTextData()); } @@ -159,7 +174,10 @@ public class TestJSONIntermediateDataFormat { json.put("2", 34); json.put("3", "54"); json.put("4", "random data"); - json.put("5", org.apache.commons.codec.binary.Base64.encodeBase64String(new byte[] { (byte) -112, (byte) 54 })); + json.put( + "5", + org.apache.commons.codec.binary.Base64.encodeBase64String(new byte[] { (byte) -112, + (byte) 54 })); json.put("6", String.valueOf(0x0A)); json.put("7", "ENUM"); JSONArray givenArrayOne = new JSONArray(); @@ -206,9 +224,10 @@ public class TestJSONIntermediateDataFormat { @Test public void testInputAsDataInAndCSVOut() { - String csvExpected = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" - + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + ",13.44," - + csvSet; + String csvExpected = "10,34,'54','random data'," + + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A) + + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + + ",13.44," + csvSet; dataFormat.setData(createJSONObject()); assertEquals(csvExpected, dataFormat.getCSVTextData()); } @@ -294,9 +313,10 @@ public class TestJSONIntermediateDataFormat { public void testInputAsObjectArrayInAndCSVOut() { Object[] out = createObjectArray(); dataFormat.setObjectData(out); - String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" - + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + ",13.44," - + csvSet; + String csvText = "10,34,'54','random data'," + + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A) + + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + + ",13.44," + csvSet; assertEquals(csvText, dataFormat.getCSVTextData()); } @@ -306,4 +326,196 @@ public class TestJSONIntermediateDataFormat { dataFormat.setObjectData(out); assertObjectArray(); } + + // **************test cases for empty and null schema******************* + @Test(expectedExceptions = SqoopException.class) + public void testEmptySchema() { + String testData = "10,34,'54','random data'," + + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'\\n'"; + // no coumns + Schema schema = new Schema("Test"); + dataFormat = new JSONIntermediateDataFormat(schema); + dataFormat.setCSVTextData(testData); + + @SuppressWarnings("unused") + Object[] out = dataFormat.getObjectData(); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNullSchema() { + dataFormat = new JSONIntermediateDataFormat(null); + dataFormat.getObjectData(); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNotSettingSchemaAndGetObjectData() { + dataFormat = new JSONIntermediateDataFormat(); + dataFormat.getObjectData(); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNotSettingSchemaAndGetData() { + dataFormat = new JSONIntermediateDataFormat(); + dataFormat.getData(); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNotSettingSchemaAndGetCSVData() { + dataFormat = new JSONIntermediateDataFormat(); + dataFormat.getCSVTextData(); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNotSettingSchemaAndSetObjectData() { + dataFormat = new JSONIntermediateDataFormat(); + dataFormat.setObjectData(null); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNotSettingSchemaAndSetData() { + dataFormat = new JSONIntermediateDataFormat(); + dataFormat.setData(null); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNotSettingSchemaAndSetCSVData() { + dataFormat = new JSONIntermediateDataFormat(); + dataFormat.setCSVTextData(null); + } + + // **************test cases for null input******************* + + @Test + public void testNullInputAsCSVTextInObjectArrayOut() { + + dataFormat.setCSVTextData(null); + Object[] out = dataFormat.getObjectData(); + assertNull(out); + } + + @Test(expectedExceptions = SqoopException.class) + public void testEmptyInputAsCSVTextInObjectArrayOut() { + dataFormat.setCSVTextData(""); + dataFormat.getObjectData(); + } + + @Test + public void testNullValueAsObjectArrayInAndCSVTextOut() { + + Object[] in = { null, null, null, null, null, null, null, null, null, null, null, null, null, + null, null }; + dataFormat.setObjectData(in); + + String csvText = dataFormat.getCSVTextData(); + String[] textValues = csvText.split(","); + assertEquals(15, textValues.length); + for (String text : textValues) { + assertEquals(text, NULL_VALUE); + } + } + + @Test + public void testNullValueAsObjectArrayInAndObjectArrayOut() { + Object[] in = { null, null, null, null, null, null, null, null, null, null, null, null, null, + null, null }; + dataFormat.setObjectData(in); + + Object[] out = dataFormat.getObjectData(); + assertEquals(15, out.length); + for (Object obj : out) { + assertEquals(obj, null); + } + } + + @Test + public void testNullValueAsCSVTextInAndObjectArrayOut() { + String[] test = { "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", + "NULL", "NULL", "NULL", "NULL", "NULL", "NULL" }; + dataFormat.setCSVTextData(StringUtils.join(test, ",")); + Object[] out = dataFormat.getObjectData(); + assertEquals(15, out.length); + for (Object obj : out) { + assertEquals(obj, null); + } + } + + @Test + public void testNullValueAsCSVTextInAndCSVTextOut() { + + String[] test = { "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", + "NULL", "NULL", "NULL", "NULL", "NULL", "NULL" }; + dataFormat.setCSVTextData(StringUtils.join(test, ",")); + + String csvText = dataFormat.getCSVTextData(); + String[] textValues = csvText.split(","); + assertEquals(15, textValues.length); + for (String text : textValues) { + assertEquals(text, NULL_VALUE); + } + } + + @SuppressWarnings("unchecked") + @Test + public void testNullValueAsDataInAndCSVTextOut() { + + JSONObject json = new JSONObject(); + for (int i = 1; i <= 15; i++) { + json.put(String.valueOf(i), null); + } + dataFormat.setData(json); + + String csvText = dataFormat.getCSVTextData(); + String[] textValues = csvText.split(","); + assertEquals(15, textValues.length); + for (String text : textValues) { + assertEquals(text, NULL_VALUE); + } + } + + @SuppressWarnings("unchecked") + @Test + public void testNullValueAsDataInAndObjectArrayOut() { + + JSONObject json = new JSONObject(); + for (int i = 1; i <= 15; i++) { + json.put(String.valueOf(i), null); + } + dataFormat.setData(json); + + Object[] out = dataFormat.getObjectData(); + assertEquals(15, out.length); + for (Object obj : out) { + assertEquals(obj, null); + } + + } + + @Test(expectedExceptions = SqoopException.class) + public void testSchemaNotNullableWithObjectArray() { + dataFormat = new JSONIntermediateDataFormat(); + dataFormat.setSchema(new Schema("Test").addColumn(new Text("t").setNullable(false))); + Object[] out = new Object[1]; + out[0] = null; + dataFormat.setObjectData(out); + } + + @Test(expectedExceptions = SqoopException.class) + public void testSchemaNotNullableWithCSV() { + dataFormat = new JSONIntermediateDataFormat(); + dataFormat.setSchema(new Schema("Test").addColumn(new Text("t").setNullable(false))); + dataFormat.setCSVTextData(NULL_VALUE); + } + + @SuppressWarnings("unchecked") + // no validation happens when the setJSON and getJSON is used + @Test + public void testSchemaNotNullableWithJSON() { + dataFormat = new JSONIntermediateDataFormat(); + dataFormat.setSchema(new Schema("Test").addColumn(new Text("t").setNullable(false))); + JSONObject json = new JSONObject(); + json.put("test", null); + dataFormat.setData(json); + dataFormat.getData(); + } + }
