SQOOP-1348: Sqoop2: Remove Data class (Veena Basavaraj via Abraham Elmahrek)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ace22237 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ace22237 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ace22237 Branch: refs/heads/sqoop2 Commit: ace222374b9f70915e16eaa9af8360940951bfd2 Parents: 49a7431 Author: Abraham Elmahrek <[email protected]> Authored: Mon Nov 10 13:18:38 2014 -0800 Committer: Abraham Elmahrek <[email protected]> Committed: Mon Nov 10 13:18:38 2014 -0800 ---------------------------------------------------------------------- SQOOP-1348.patch | 1844 ++++++++++++++++++ .../java/org/apache/sqoop/schema/Schema.java | 7 +- .../idf/CSVIntermediateDataFormat.java | 10 +- .../idf/TestCSVIntermediateDataFormat.java | 106 +- .../main/java/org/apache/sqoop/job/io/Data.java | 529 ----- .../java/org/apache/sqoop/job/JobUtils.java | 93 - .../org/apache/sqoop/job/TestMapReduce.java | 132 +- .../java/org/apache/sqoop/job/TestMatching.java | 21 +- .../apache/sqoop/job/io/SqoopWritableTest.java | 95 - .../java/org/apache/sqoop/job/io/TestData.java | 117 -- .../apache/sqoop/job/io/TestSqoopWritable.java | 89 + .../mr/TestSqoopOutputFormatLoadExecutor.java | 33 +- .../apache/sqoop/job/util/MRJobTestUtil.java | 114 ++ 13 files changed, 2201 insertions(+), 989 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/ace22237/SQOOP-1348.patch ---------------------------------------------------------------------- diff --git a/SQOOP-1348.patch b/SQOOP-1348.patch new file mode 100644 index 0000000..7834a3f --- /dev/null +++ b/SQOOP-1348.patch @@ -0,0 +1,1844 @@ +diff --git a/common/src/main/java/org/apache/sqoop/schema/Schema.java b/common/src/main/java/org/apache/sqoop/schema/Schema.java +index 40c362c..3aa3aea 100644 +--- a/common/src/main/java/org/apache/sqoop/schema/Schema.java ++++ b/common/src/main/java/org/apache/sqoop/schema/Schema.java +@@ -122,12 +122,7 @@ public Schema setCreationDate(Date creationDate) { + } + + public boolean isEmpty() { +- if (columns.size()==0) { +- return true; +- } else { +- return false; +- } +- ++ return columns.size() == 0; + } + + public String toString() { +diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java +index e0e4061..e65edd9 100644 +--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java ++++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java +@@ -67,9 +67,15 @@ + + private final List<Integer> stringFieldIndices = new ArrayList<Integer>(); + private final List<Integer> byteFieldIndices = new ArrayList<Integer>(); +- + private Schema schema; + ++ public CSVIntermediateDataFormat() { ++ } ++ ++ public CSVIntermediateDataFormat(Schema schema) { ++ setSchema(schema); ++ } ++ + /** + * {@inheritDoc} + */ +@@ -166,7 +172,7 @@ public void setSchema(Schema schema) { + */ + @Override + public Object[] getObjectData() { +- if (schema.isEmpty()) { ++ if (schema == null || schema.isEmpty()) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006); + } + +diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java +index 72e95ed..fcf6c3c 100644 +--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java ++++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java +@@ -41,11 +41,11 @@ + + private final String BYTE_FIELD_ENCODING = "ISO-8859-1"; + +- private IntermediateDataFormat<?> data; ++ private IntermediateDataFormat<?> dataFormat; + + @Before + public void setUp() { +- data = new CSVIntermediateDataFormat(); ++ dataFormat = new CSVIntermediateDataFormat(); + } + + private String getByteFieldString(byte[] byteFieldData) { +@@ -61,8 +61,8 @@ private String getByteFieldString(byte[] byteFieldData) { + public void testStringInStringOut() { + String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) + + ",'" + String.valueOf(0x0A) + "'"; +- data.setTextData(testData); +- assertEquals(testData, data.getTextData()); ++ dataFormat.setTextData(testData); ++ assertEquals(testData, dataFormat.getTextData()); + } + + @Test +@@ -74,10 +74,10 @@ public void testNullStringInObjectOut() { + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); +- data.setSchema(schema); +- data.setTextData(null); ++ dataFormat.setSchema(schema); ++ dataFormat.setTextData(null); + +- Object[] out = data.getObjectData(); ++ Object[] out = dataFormat.getObjectData(); + + assertNull(out); + } +@@ -91,10 +91,10 @@ public void testEmptyStringInObjectOut() { + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); +- data.setSchema(schema); +- data.setTextData(""); ++ dataFormat.setSchema(schema); ++ dataFormat.setTextData(""); + +- data.getObjectData(); ++ dataFormat.getObjectData(); + } + + @Test +@@ -111,10 +111,10 @@ public void testStringInObjectOut() { + .addColumn(new Binary("5")) + .addColumn(new Text("6")); + +- data.setSchema(schema); +- data.setTextData(testData); ++ dataFormat.setSchema(schema); ++ dataFormat.setTextData(testData); + +- Object[] out = data.getObjectData(); ++ Object[] out = dataFormat.getObjectData(); + + assertEquals(new Long(10),out[0]); + assertEquals(new Long(34),out[1]); +@@ -134,7 +134,7 @@ public void testObjectInStringOut() { + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); +- data.setSchema(schema); ++ dataFormat.setSchema(schema); + + byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54}; + Object[] in = new Object[6]; +@@ -145,12 +145,12 @@ public void testObjectInStringOut() { + in[4] = byteFieldData; + in[5] = new String(new char[] { 0x0A }); + +- data.setObjectData(in); ++ dataFormat.setObjectData(in); + + //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements + String testData = "10,34,'54','random data'," + + getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'"; +- assertEquals(testData, data.getTextData()); ++ assertEquals(testData, dataFormat.getTextData()); + } + + @Test +@@ -164,7 +164,7 @@ public void testObjectInObjectOut() { + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); +- data.setSchema(schema); ++ dataFormat.setSchema(schema); + + Object[] in = new Object[6]; + in[0] = new Long(10); +@@ -177,9 +177,9 @@ public void testObjectInObjectOut() { + System.arraycopy(in,0,inCopy,0,in.length); + + // Modifies the input array, so we use the copy to confirm +- data.setObjectData(in); ++ dataFormat.setObjectData(in); + +- assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); ++ assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData())); + } + + @Test +@@ -191,7 +191,7 @@ public void testObjectWithNullInStringOut() { + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); +- data.setSchema(schema); ++ dataFormat.setSchema(schema); + + byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54}; + Object[] in = new Object[6]; +@@ -202,12 +202,12 @@ public void testObjectWithNullInStringOut() { + in[4] = byteFieldData; + in[5] = new String(new char[] { 0x0A }); + +- data.setObjectData(in); ++ dataFormat.setObjectData(in); + + //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements + String testData = "10,34,NULL,'random data'," + + getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'"; +- assertEquals(testData, data.getTextData()); ++ assertEquals(testData, dataFormat.getTextData()); + } + + @Test +@@ -215,7 +215,7 @@ public void testStringFullRangeOfCharacters() { + Schema schema = new Schema("test"); + schema.addColumn(new Text("1")); + +- data.setSchema(schema); ++ dataFormat.setSchema(schema); + + char[] allCharArr = new char[256]; + for(int i = 0; i < allCharArr.length; ++i) { +@@ -228,17 +228,17 @@ public void testStringFullRangeOfCharacters() { + System.arraycopy(in, 0, inCopy, 0, in.length); + + // Modifies the input array, so we use the copy to confirm +- data.setObjectData(in); ++ dataFormat.setObjectData(in); + +- assertEquals(strData, data.getObjectData()[0]); +- assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); ++ assertEquals(strData, dataFormat.getObjectData()[0]); ++ assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData())); + } + + @Test + public void testByteArrayFullRangeOfCharacters() { + Schema schema = new Schema("test"); + schema.addColumn(new Binary("1")); +- data.setSchema(schema); ++ dataFormat.setSchema(schema); + + byte[] allCharByteArr = new byte[256]; + for (int i = 0; i < allCharByteArr.length; ++i) { +@@ -250,32 +250,32 @@ public void testByteArrayFullRangeOfCharacters() { + System.arraycopy(in, 0, inCopy, 0, in.length); + + // Modifies the input array, so we use the copy to confirm +- data.setObjectData(in); +- assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); ++ dataFormat.setObjectData(in); ++ assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData())); + } + + @Test + public void testDate() { + Schema schema = new Schema("test"); + schema.addColumn(new Date("1")); +- data.setSchema(schema); ++ dataFormat.setSchema(schema); + +- data.setTextData("2014-10-01"); +- assertEquals("2014-10-01", data.getObjectData()[0].toString()); ++ dataFormat.setTextData("2014-10-01"); ++ assertEquals("2014-10-01", dataFormat.getObjectData()[0].toString()); + } + + @Test + public void testDateTime() { + Schema schema = new Schema("test"); + schema.addColumn(new DateTime("1")); +- data.setSchema(schema); ++ dataFormat.setSchema(schema); + + for (String dateTime : new String[]{ + "2014-10-01T12:00:00", + "2014-10-01T12:00:00.000" + }) { +- data.setTextData(dateTime); +- assertEquals("2014-10-01T12:00:00.000", data.getObjectData()[0].toString()); ++ dataFormat.setTextData(dateTime); ++ assertEquals("2014-10-01T12:00:00.000", dataFormat.getObjectData()[0].toString()); + } + } + +@@ -289,14 +289,14 @@ public void testDateTime() { + public void testDateTimeISO8601Alternative() { + Schema schema = new Schema("test"); + schema.addColumn(new DateTime("1")); +- data.setSchema(schema); ++ dataFormat.setSchema(schema); + + for (String dateTime : new String[]{ + "2014-10-01 12:00:00", + "2014-10-01 12:00:00.000" + }) { +- data.setTextData(dateTime); +- assertEquals("2014-10-01T12:00:00.000", data.getObjectData()[0].toString()); ++ dataFormat.setTextData(dateTime); ++ assertEquals("2014-10-01T12:00:00.000", dataFormat.getObjectData()[0].toString()); + } + } + +@@ -304,20 +304,20 @@ public void testDateTimeISO8601Alternative() { + public void testBit() { + Schema schema = new Schema("test"); + schema.addColumn(new Bit("1")); +- data.setSchema(schema); ++ dataFormat.setSchema(schema); + + for (String trueBit : new String[]{ + "true", "TRUE", "1" + }) { +- data.setTextData(trueBit); +- assertTrue((Boolean) data.getObjectData()[0]); ++ dataFormat.setTextData(trueBit); ++ assertTrue((Boolean) dataFormat.getObjectData()[0]); + } + + for (String falseBit : new String[]{ + "false", "FALSE", "0" + }) { +- data.setTextData(falseBit); +- assertFalse((Boolean) data.getObjectData()[0]); ++ dataFormat.setTextData(falseBit); ++ assertFalse((Boolean) dataFormat.getObjectData()[0]); + } + } + +@@ -326,9 +326,23 @@ public void testEmptySchema() { + String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) + + ",'\\n'"; + Schema schema = new Schema("Test"); +- data.setSchema(schema); +- data.setTextData(testData); ++ dataFormat.setSchema(schema); ++ dataFormat.setTextData(testData); + +- Object[] out = data.getObjectData(); ++ @SuppressWarnings("unused") ++ Object[] out = dataFormat.getObjectData(); ++ } ++ ++ @Test(expected = SqoopException.class) ++ public void testNullSchema() { ++ dataFormat.setSchema(null); ++ @SuppressWarnings("unused") ++ Object[] out = dataFormat.getObjectData(); ++ } ++ ++ @Test(expected = SqoopException.class) ++ public void testNotSettingSchema() { ++ @SuppressWarnings("unused") ++ Object[] out = dataFormat.getObjectData(); + } + } +diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java +deleted file mode 100644 +index 139883e..0000000 +--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java ++++ /dev/null +@@ -1,529 +0,0 @@ +-/** +- * Licensed to the Apache Software Foundation (ASF) under one +- * or more contributor license agreements. See the NOTICE file +- * distributed with this work for additional information +- * regarding copyright ownership. The ASF licenses this file +- * to you under the Apache License, Version 2.0 (the +- * "License"); you may not use this file except in compliance +- * with the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +-package org.apache.sqoop.job.io; +- +-import java.io.DataInput; +-import java.io.DataOutput; +-import java.io.IOException; +-import java.nio.charset.Charset; +-import java.util.ArrayList; +-import java.util.Arrays; +-import java.util.regex.Matcher; +- +-import org.apache.hadoop.io.WritableComparable; +-import org.apache.hadoop.io.WritableComparator; +-import org.apache.hadoop.io.WritableUtils; +-import org.apache.sqoop.common.SqoopException; +-import org.apache.sqoop.job.MRExecutionError; +- +-public class Data implements WritableComparable<Data> { +- +- // The content is an Object to accommodate different kinds of data. +- // For example, it can be: +- // - Object[] for an array of object record +- // - String for a text of CSV record +- private volatile Object content = null; +- +- public static final int EMPTY_DATA = 0; +- public static final int CSV_RECORD = 1; +- public static final int ARRAY_RECORD = 2; +- private int type = EMPTY_DATA; +- +- public static final String CHARSET_NAME = "UTF-8"; +- +- public static final char DEFAULT_RECORD_DELIMITER = '\n'; +- public static final char DEFAULT_FIELD_DELIMITER = ','; +- public static final char DEFAULT_STRING_DELIMITER = '\''; +- public static final char DEFAULT_STRING_ESCAPE = '\\'; +- private char fieldDelimiter = DEFAULT_FIELD_DELIMITER; +- private char stringDelimiter = DEFAULT_STRING_DELIMITER; +- private char stringEscape = DEFAULT_STRING_ESCAPE; +- private String escapedStringDelimiter = String.valueOf(new char[] { +- stringEscape, stringDelimiter +- }); +- +- private int[] fieldTypes = null; +- +- public void setFieldDelimiter(char fieldDelimiter) { +- this.fieldDelimiter = fieldDelimiter; +- } +- +- public void setFieldTypes(int[] fieldTypes) { +- this.fieldTypes = fieldTypes; +- } +- +- public void setContent(Object content, int type) { +- switch (type) { +- case EMPTY_DATA: +- case CSV_RECORD: +- case ARRAY_RECORD: +- this.type = type; +- this.content = content; +- break; +- default: +- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); +- } +- } +- +- public Object getContent(int targetType) { +- switch (targetType) { +- case CSV_RECORD: +- return format(); +- case ARRAY_RECORD: +- return parse(); +- default: +- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(targetType)); +- } +- } +- +- public int getType() { +- return type; +- } +- +- public boolean isEmpty() { +- return (type == EMPTY_DATA); +- } +- +- @Override +- public String toString() { +- return (String)getContent(CSV_RECORD); +- } +- +- @Override +- public int compareTo(Data other) { +- byte[] myBytes = toString().getBytes(Charset.forName(CHARSET_NAME)); +- byte[] otherBytes = other.toString().getBytes( +- Charset.forName(CHARSET_NAME)); +- return WritableComparator.compareBytes( +- myBytes, 0, myBytes.length, otherBytes, 0, otherBytes.length); +- } +- +- @Override +- public boolean equals(Object other) { +- if (!(other instanceof Data)) { +- return false; +- } +- +- Data data = (Data)other; +- if (type != data.getType()) { +- return false; +- } +- +- return toString().equals(data.toString()); +- } +- +- @Override +- public int hashCode() { +- int result = super.hashCode(); +- switch (type) { +- case CSV_RECORD: +- result += 31 * content.hashCode(); +- return result; +- case ARRAY_RECORD: +- Object[] array = (Object[])content; +- for (int i = 0; i < array.length; i++) { +- result += 31 * array[i].hashCode(); +- } +- return result; +- default: +- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); +- } +- } +- +- @Override +- public void readFields(DataInput in) throws IOException { +- type = readType(in); +- switch (type) { +- case CSV_RECORD: +- readCsv(in); +- break; +- case ARRAY_RECORD: +- readArray(in); +- break; +- default: +- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); +- } +- } +- +- @Override +- public void write(DataOutput out) throws IOException { +- writeType(out, type); +- switch (type) { +- case CSV_RECORD: +- writeCsv(out); +- break; +- case ARRAY_RECORD: +- writeArray(out); +- break; +- default: +- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); +- } +- } +- +- private int readType(DataInput in) throws IOException { +- return WritableUtils.readVInt(in); +- } +- +- private void writeType(DataOutput out, int type) throws IOException { +- WritableUtils.writeVInt(out, type); +- } +- +- private void readCsv(DataInput in) throws IOException { +- content = in.readUTF(); +- } +- +- private void writeCsv(DataOutput out) throws IOException { +- out.writeUTF((String)content); +- } +- +- private void readArray(DataInput in) throws IOException { +- // read number of columns +- int columns = in.readInt(); +- content = new Object[columns]; +- Object[] array = (Object[])content; +- // read each column +- for (int i = 0; i < array.length; i++) { +- int type = readType(in); +- switch (type) { +- case FieldTypes.UTF: +- array[i] = in.readUTF(); +- break; +- +- case FieldTypes.BIN: +- int length = in.readInt(); +- byte[] bytes = new byte[length]; +- in.readFully(bytes); +- array[i] = bytes; +- break; +- +- case FieldTypes.DOUBLE: +- array[i] = in.readDouble(); +- break; +- +- case FieldTypes.FLOAT: +- array[i] = in.readFloat(); +- break; +- +- case FieldTypes.LONG: +- array[i] = in.readLong(); +- break; +- +- case FieldTypes.INT: +- array[i] = in.readInt(); +- break; +- +- case FieldTypes.SHORT: +- array[i] = in.readShort(); +- break; +- +- case FieldTypes.CHAR: +- array[i] = in.readChar(); +- break; +- +- case FieldTypes.BYTE: +- array[i] = in.readByte(); +- break; +- +- case FieldTypes.BOOLEAN: +- array[i] = in.readBoolean(); +- break; +- +- case FieldTypes.NULL: +- array[i] = null; +- break; +- +- default: +- throw new IOException( +- new SqoopException(MRExecutionError.MAPRED_EXEC_0012, Integer.toString(type)) +- ); +- } +- } +- } +- +- private void writeArray(DataOutput out) throws IOException { +- Object[] array = (Object[])content; +- // write number of columns +- out.writeInt(array.length); +- // write each column +- for (int i = 0; i < array.length; i++) { +- if (array[i] instanceof String) { +- writeType(out, FieldTypes.UTF); +- out.writeUTF((String)array[i]); +- +- } else if (array[i] instanceof byte[]) { +- writeType(out, FieldTypes.BIN); +- out.writeInt(((byte[])array[i]).length); +- out.write((byte[])array[i]); +- +- } else if (array[i] instanceof Double) { +- writeType(out, FieldTypes.DOUBLE); +- out.writeDouble((Double)array[i]); +- +- } else if (array[i] instanceof Float) { +- writeType(out, FieldTypes.FLOAT); +- out.writeFloat((Float)array[i]); +- +- } else if (array[i] instanceof Long) { +- writeType(out, FieldTypes.LONG); +- out.writeLong((Long)array[i]); +- +- } else if (array[i] instanceof Integer) { +- writeType(out, FieldTypes.INT); +- out.writeInt((Integer)array[i]); +- +- } else if (array[i] instanceof Short) { +- writeType(out, FieldTypes.SHORT); +- out.writeShort((Short)array[i]); +- +- } else if (array[i] instanceof Character) { +- writeType(out, FieldTypes.CHAR); +- out.writeChar((Character)array[i]); +- +- } else if (array[i] instanceof Byte) { +- writeType(out, FieldTypes.BYTE); +- out.writeByte((Byte)array[i]); +- +- } else if (array[i] instanceof Boolean) { +- writeType(out, FieldTypes.BOOLEAN); +- out.writeBoolean((Boolean)array[i]); +- +- } else if (array[i] == null) { +- writeType(out, FieldTypes.NULL); +- +- } else { +- throw new IOException( +- new SqoopException(MRExecutionError.MAPRED_EXEC_0012, +- array[i].getClass().getName() +- ) +- ); +- } +- } +- } +- +- private String format() { +- switch (type) { +- case EMPTY_DATA: +- return null; +- +- case CSV_RECORD: +- if (fieldDelimiter == DEFAULT_FIELD_DELIMITER) { +- return (String)content; +- } else { +- // TODO: need to exclude the case where comma is part of a string. +- return ((String)content).replaceAll( +- String.valueOf(DEFAULT_FIELD_DELIMITER), +- String.valueOf(fieldDelimiter)); +- } +- +- case ARRAY_RECORD: +- StringBuilder sb = new StringBuilder(); +- Object[] array = (Object[])content; +- for (int i = 0; i < array.length; i++) { +- if (i != 0) { +- sb.append(fieldDelimiter); +- } +- +- if (array[i] instanceof String) { +- sb.append(stringDelimiter); +- sb.append(escape((String)array[i])); +- sb.append(stringDelimiter); +- } else if (array[i] instanceof byte[]) { +- sb.append(Arrays.toString((byte[])array[i])); +- } else { +- sb.append(String.valueOf(array[i])); +- } +- } +- return sb.toString(); +- +- default: +- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); +- } +- } +- +- private Object[] parse() { +- switch (type) { +- case EMPTY_DATA: +- return null; +- +- case CSV_RECORD: +- ArrayList<Object> list = new ArrayList<Object>(); +- char[] record = ((String)content).toCharArray(); +- int start = 0; +- int position = start; +- boolean stringDelimited = false; +- boolean arrayDelimited = false; +- int index = 0; +- while (position < record.length) { +- if (record[position] == fieldDelimiter) { +- if (!stringDelimited && !arrayDelimited) { +- index = parseField(list, record, start, position, index); +- start = position + 1; +- } +- } else if (record[position] == stringDelimiter) { +- if (!stringDelimited) { +- stringDelimited = true; +- } +- else if (position > 0 && record[position-1] != stringEscape) { +- stringDelimited = false; +- } +- } else if (record[position] == '[') { +- if (!stringDelimited) { +- arrayDelimited = true; +- } +- } else if (record[position] == ']') { +- if (!stringDelimited) { +- arrayDelimited = false; +- } +- } +- position++; +- } +- parseField(list, record, start, position, index); +- return list.toArray(); +- +- case ARRAY_RECORD: +- return (Object[])content; +- +- default: +- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); +- } +- } +- +- private int parseField(ArrayList<Object> list, char[] record, +- int start, int end, int index) { +- String field = String.valueOf(record, start, end-start).trim(); +- +- int fieldType; +- if (fieldTypes == null) { +- fieldType = guessType(field); +- } else { +- fieldType = fieldTypes[index]; +- } +- +- switch (fieldType) { +- case FieldTypes.UTF: +- if (field.charAt(0) != stringDelimiter || +- field.charAt(field.length()-1) != stringDelimiter) { +- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022); +- } +- list.add(index, unescape(field.substring(1, field.length()-1))); +- break; +- +- case FieldTypes.BIN: +- if (field.charAt(0) != '[' || +- field.charAt(field.length()-1) != ']') { +- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022); +- } +- String[] splits = +- field.substring(1, field.length()-1).split(String.valueOf(',')); +- byte[] bytes = new byte[splits.length]; +- for (int i=0; i<bytes.length; i++) { +- bytes[i] = Byte.parseByte(splits[i].trim()); +- } +- list.add(index, bytes); +- break; +- +- case FieldTypes.DOUBLE: +- list.add(index, Double.parseDouble(field)); +- break; +- +- case FieldTypes.FLOAT: +- list.add(index, Float.parseFloat(field)); +- break; +- +- case FieldTypes.LONG: +- list.add(index, Long.parseLong(field)); +- break; +- +- case FieldTypes.INT: +- list.add(index, Integer.parseInt(field)); +- break; +- +- case FieldTypes.SHORT: +- list.add(index, Short.parseShort(field)); +- break; +- +- case FieldTypes.CHAR: +- list.add(index, Character.valueOf(field.charAt(0))); +- break; +- +- case FieldTypes.BYTE: +- list.add(index, Byte.parseByte(field)); +- break; +- +- case FieldTypes.BOOLEAN: +- list.add(index, Boolean.parseBoolean(field)); +- break; +- +- case FieldTypes.NULL: +- list.add(index, null); +- break; +- +- default: +- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(fieldType)); +- } +- +- return ++index; +- } +- +- private int guessType(String field) { +- char[] value = field.toCharArray(); +- +- if (value[0] == stringDelimiter) { +- return FieldTypes.UTF; +- } +- +- switch (value[0]) { +- case 'n': +- case 'N': +- return FieldTypes.NULL; +- case '[': +- return FieldTypes.BIN; +- case 't': +- case 'f': +- case 'T': +- case 'F': +- return FieldTypes.BOOLEAN; +- } +- +- int position = 1; +- while (position < value.length) { +- switch (value[position++]) { +- case '.': +- return FieldTypes.DOUBLE; +- } +- } +- +- return FieldTypes.LONG; +- } +- +- private String escape(String string) { +- // TODO: Also need to escape those special characters as documented in: +- // https://cwiki.apache.org/confluence/display/SQOOP/Sqoop2+Intermediate+representation#Sqoop2Intermediaterepresentation-Intermediateformatrepresentationproposal +- String regex = String.valueOf(stringDelimiter); +- String replacement = Matcher.quoteReplacement(escapedStringDelimiter); +- return string.replaceAll(regex, replacement); +- } +- +- private String unescape(String string) { +- // TODO: Also need to unescape those special characters as documented in: +- // https://cwiki.apache.org/confluence/display/SQOOP/Sqoop2+Intermediate+representation#Sqoop2Intermediaterepresentation-Intermediateformatrepresentationproposal +- String regex = Matcher.quoteReplacement(escapedStringDelimiter); +- String replacement = String.valueOf(stringDelimiter); +- return string.replaceAll(regex, replacement); +- } +-} +\ No newline at end of file +diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java +deleted file mode 100644 +index dafdeb4..0000000 +--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java ++++ /dev/null +@@ -1,93 +0,0 @@ +-/** +- * Licensed to the Apache Software Foundation (ASF) under one +- * or more contributor license agreements. See the NOTICE file +- * distributed with this work for additional information +- * regarding copyright ownership. The ASF licenses this file +- * to you under the Apache License, Version 2.0 (the +- * "License"); you may not use this file except in compliance +- * with the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +-package org.apache.sqoop.job; +- +-import java.io.IOException; +- +-import org.apache.hadoop.conf.Configuration; +-import org.apache.hadoop.io.NullWritable; +-import org.apache.hadoop.mapreduce.InputFormat; +-import org.apache.hadoop.mapreduce.Job; +-import org.apache.hadoop.mapreduce.JobContext; +-import org.apache.hadoop.mapreduce.Mapper; +-import org.apache.hadoop.mapreduce.OutputCommitter; +-import org.apache.hadoop.mapreduce.OutputFormat; +-import org.apache.sqoop.job.io.SqoopWritable; +-import org.apache.sqoop.job.mr.SqoopSplit; +-import org.apache.sqoop.utils.ClassUtils; +- +-import static org.mockito.Mockito.mock; +-import static org.mockito.Mockito.when; +- +-public class JobUtils { +- +- public static boolean runJob(Configuration conf, +- Class<? extends InputFormat<SqoopSplit, NullWritable>> input, +- Class<? extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable>> mapper, +- Class<? extends OutputFormat<SqoopWritable, NullWritable>> output) +- throws IOException, InterruptedException, ClassNotFoundException { +- Job job = new Job(conf); +- job.setInputFormatClass(input); +- job.setMapperClass(mapper); +- job.setMapOutputKeyClass(SqoopWritable.class); +- job.setMapOutputValueClass(NullWritable.class); +- job.setOutputFormatClass(output); +- job.setOutputKeyClass(SqoopWritable.class); +- job.setOutputValueClass(NullWritable.class); +- +- boolean ret = job.waitForCompletion(true); +- +- // Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called in LocalJobRuner +- if (isHadoop1()) { +- callOutputCommitter(job, output); +- } +- +- return ret; +- } +- +- /** +- * Call output format on given job manually. +- */ +- private static void callOutputCommitter(Job job, Class<? extends OutputFormat<SqoopWritable, NullWritable>> outputFormat) throws IOException, InterruptedException { +- OutputCommitter committer = ((OutputFormat)ClassUtils.instantiate(outputFormat)).getOutputCommitter(null); +- +- JobContext jobContext = mock(JobContext.class); +- when(jobContext.getConfiguration()).thenReturn(job.getConfiguration()); +- +- committer.commitJob(jobContext); +- } +- +- /** +- * Detect Hadoop 1.0 installation +- * +- * @return True if and only if this is Hadoop 1 and below +- */ +- public static boolean isHadoop1() { +- String version = org.apache.hadoop.util.VersionInfo.getVersion(); +- if (version.matches("\\b0\\.20\\..+\\b") +- || version.matches("\\b1\\.\\d\\.\\d")) { +- return true; +- } +- return false; +- } +- +- private JobUtils() { +- // Disable explicit object creation +- } +- +-} +diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +index 78ae4ec..bbac7d2 100644 +--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java ++++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +@@ -37,6 +37,7 @@ + import org.apache.sqoop.common.Direction; + import org.apache.sqoop.connector.common.EmptyConfiguration; + import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; ++import org.apache.sqoop.connector.idf.IntermediateDataFormat; + import org.apache.sqoop.job.etl.Destroyer; + import org.apache.sqoop.job.etl.DestroyerContext; + import org.apache.sqoop.job.etl.Extractor; +@@ -46,17 +47,13 @@ + import org.apache.sqoop.job.etl.Partition; + import org.apache.sqoop.job.etl.Partitioner; + import org.apache.sqoop.job.etl.PartitionerContext; +-import org.apache.sqoop.job.io.Data; + import org.apache.sqoop.job.io.SqoopWritable; + import org.apache.sqoop.job.mr.MRConfigurationUtils; + import org.apache.sqoop.job.mr.SqoopInputFormat; + import org.apache.sqoop.job.mr.SqoopMapper; + import org.apache.sqoop.job.mr.SqoopNullOutputFormat; + import org.apache.sqoop.job.mr.SqoopSplit; +-import org.apache.sqoop.schema.Schema; +-import org.apache.sqoop.schema.type.FixedPoint; +-import org.apache.sqoop.schema.type.FloatingPoint; +-import org.apache.sqoop.schema.type.Text; ++import org.apache.sqoop.job.util.MRJobTestUtil; + import org.junit.Assert; + import org.junit.Test; + +@@ -67,11 +64,10 @@ + private static final int NUMBER_OF_ROWS_PER_PARTITION = 10; + + @Test +- public void testInputFormat() throws Exception { ++ public void testSqoopInputFormat() throws Exception { + Configuration conf = new Configuration(); + conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); +- conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, +- CSVIntermediateDataFormat.class.getName()); ++ conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); + Job job = new Job(conf); + + SqoopInputFormat inputformat = new SqoopInputFormat(); +@@ -79,51 +75,47 @@ public void testInputFormat() throws Exception { + assertEquals(9, splits.size()); + + for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) { +- SqoopSplit split = (SqoopSplit)splits.get(id-1); +- DummyPartition partition = (DummyPartition)split.getPartition(); ++ SqoopSplit split = (SqoopSplit) splits.get(id - 1); ++ DummyPartition partition = (DummyPartition) split.getPartition(); + assertEquals(id, partition.getId()); + } + } + + @Test +- public void testMapper() throws Exception { ++ public void testSqoopMapper() throws Exception { + Configuration conf = new Configuration(); + conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); +- conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, +- CSVIntermediateDataFormat.class.getName()); +- Schema schema = new Schema("Test"); +- schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) +- .addColumn(new org.apache.sqoop.schema.type.Text("3")); +- ++ conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); + Job job = new Job(conf); +- MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema); +- MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema); +- boolean success = JobUtils.runJob(job.getConfiguration(), +- SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class); ++ // from and to have the same schema in this test case ++ MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, MRJobTestUtil.getTestSchema()); ++ MRConfigurationUtils.setConnectorSchema(Direction.TO, job, MRJobTestUtil.getTestSchema()); ++ boolean success = MRJobTestUtil.runJob(job.getConfiguration(), ++ SqoopInputFormat.class, ++ SqoopMapper.class, ++ DummyOutputFormat.class); + Assert.assertEquals("Job failed!", true, success); + } + + @Test +- public void testOutputFormat() throws Exception { ++ public void testNullOutputFormat() throws Exception { + Configuration conf = new Configuration(); + conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + conf.set(MRJobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); + conf.set(MRJobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName()); + conf.set(MRJobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName()); +- conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, +- CSVIntermediateDataFormat.class.getName()); +- Schema schema = new Schema("Test"); +- schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) +- .addColumn(new Text("3")); ++ conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); + + Job job = new Job(conf); +- MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema); +- MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema); +- boolean success = JobUtils.runJob(job.getConfiguration(), +- SqoopInputFormat.class, SqoopMapper.class, +- SqoopNullOutputFormat.class); ++ // from and to have the same schema in this test case ++ MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, MRJobTestUtil.getTestSchema()); ++ MRConfigurationUtils.setConnectorSchema(Direction.TO, job, MRJobTestUtil.getTestSchema()); ++ boolean success = MRJobTestUtil.runJob(job.getConfiguration(), ++ SqoopInputFormat.class, ++ SqoopMapper.class, ++ SqoopNullOutputFormat.class); + Assert.assertEquals("Job failed!", true, success); + + // Make sure both destroyers get called. +@@ -171,15 +163,17 @@ public String toString() { + } + } + +- public static class DummyExtractor extends Extractor<EmptyConfiguration, EmptyConfiguration, DummyPartition> { ++ public static class DummyExtractor extends ++ Extractor<EmptyConfiguration, EmptyConfiguration, DummyPartition> { + @Override +- public void extract(ExtractorContext context, EmptyConfiguration oc, EmptyConfiguration oj, DummyPartition partition) { +- int id = ((DummyPartition)partition).getId(); ++ public void extract(ExtractorContext context, EmptyConfiguration oc, EmptyConfiguration oj, ++ DummyPartition partition) { ++ int id = ((DummyPartition) partition).getId(); + for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) { +- context.getDataWriter().writeArrayRecord(new Object[] { +- id * NUMBER_OF_ROWS_PER_PARTITION + row, +- (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row), +- String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)}); ++ context.getDataWriter().writeArrayRecord( ++ new Object[] { id * NUMBER_OF_ROWS_PER_PARTITION + row, ++ (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row), ++ String.valueOf(id * NUMBER_OF_ROWS_PER_PARTITION + row) }); + } + } + +@@ -189,16 +183,14 @@ public long getRowsRead() { + } + } + +- public static class DummyOutputFormat +- extends OutputFormat<SqoopWritable, NullWritable> { ++ public static class DummyOutputFormat extends OutputFormat<SqoopWritable, NullWritable> { + @Override + public void checkOutputSpecs(JobContext context) { + // do nothing + } + + @Override +- public RecordWriter<SqoopWritable, NullWritable> getRecordWriter( +- TaskAttemptContext context) { ++ public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(TaskAttemptContext context) { + return new DummyRecordWriter(); + } + +@@ -207,22 +199,17 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) { + return new DummyOutputCommitter(); + } + +- public static class DummyRecordWriter +- extends RecordWriter<SqoopWritable, NullWritable> { +- private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; +- private Data data = new Data(); ++ public static class DummyRecordWriter extends RecordWriter<SqoopWritable, NullWritable> { ++ private int index = START_PARTITION * NUMBER_OF_ROWS_PER_PARTITION; ++ // should I use a dummy IDF for testing? ++ private IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF(); + + @Override + public void write(SqoopWritable key, NullWritable value) { +- +- data.setContent(new Object[] { +- index, +- (double) index, +- String.valueOf(index)}, +- Data.ARRAY_RECORD); ++ String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'"; ++ dataFormat.setTextData(testData); + index++; +- +- assertEquals(data.toString(), key.toString()); ++ assertEquals(dataFormat.getTextData().toString(), key.toString()); + } + + @Override +@@ -233,16 +220,20 @@ public void close(TaskAttemptContext context) { + + public static class DummyOutputCommitter extends OutputCommitter { + @Override +- public void setupJob(JobContext jobContext) { } ++ public void setupJob(JobContext jobContext) { ++ } + + @Override +- public void setupTask(TaskAttemptContext taskContext) { } ++ public void setupTask(TaskAttemptContext taskContext) { ++ } + + @Override +- public void commitTask(TaskAttemptContext taskContext) { } ++ public void commitTask(TaskAttemptContext taskContext) { ++ } + + @Override +- public void abortTask(TaskAttemptContext taskContext) { } ++ public void abortTask(TaskAttemptContext taskContext) { ++ } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) { +@@ -251,39 +242,34 @@ public boolean needsTaskCommit(TaskAttemptContext taskContext) { + } + } + ++ // it is writing to the target. + public static class DummyLoader extends Loader<EmptyConfiguration, EmptyConfiguration> { +- private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; +- private Data expected = new Data(); ++ private int index = START_PARTITION * NUMBER_OF_ROWS_PER_PARTITION; ++ private IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF(); + + @Override +- public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguration oj) throws Exception{ ++ public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguration oj) ++ throws Exception { + String data; + while ((data = context.getDataReader().readTextRecord()) != null) { +- expected.setContent(new Object[] { +- index, +- (double) index, +- String.valueOf(index)}, +- Data.ARRAY_RECORD); ++ String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'"; ++ dataFormat.setTextData(testData); + index++; +- assertEquals(expected.toString(), data); ++ assertEquals(dataFormat.getTextData().toString(), data); + } + } + } + + public static class DummyFromDestroyer extends Destroyer<EmptyConfiguration, EmptyConfiguration> { +- + public static int count = 0; +- + @Override + public void destroy(DestroyerContext context, EmptyConfiguration o, EmptyConfiguration o2) { + count++; + } + } + +- public static class DummyToDestroyer extends Destroyer<EmptyConfiguration,EmptyConfiguration> { +- ++ public static class DummyToDestroyer extends Destroyer<EmptyConfiguration, EmptyConfiguration> { + public static int count = 0; +- + @Override + public void destroy(DestroyerContext context, EmptyConfiguration o, EmptyConfiguration o2) { + count++; +diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java +index 04fb692..a64a4a6 100644 +--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java ++++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java +@@ -38,16 +38,17 @@ + import org.apache.sqoop.common.Direction; + import org.apache.sqoop.connector.common.EmptyConfiguration; + import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; ++import org.apache.sqoop.connector.idf.IntermediateDataFormat; + import org.apache.sqoop.job.etl.Extractor; + import org.apache.sqoop.job.etl.ExtractorContext; + import org.apache.sqoop.job.etl.Partition; + import org.apache.sqoop.job.etl.Partitioner; + import org.apache.sqoop.job.etl.PartitionerContext; +-import org.apache.sqoop.job.io.Data; + import org.apache.sqoop.job.io.SqoopWritable; + import org.apache.sqoop.job.mr.MRConfigurationUtils; + import org.apache.sqoop.job.mr.SqoopInputFormat; + import org.apache.sqoop.job.mr.SqoopMapper; ++import org.apache.sqoop.job.util.MRJobTestUtil; + import org.apache.sqoop.schema.Schema; + import org.apache.sqoop.schema.type.FixedPoint; + import org.apache.sqoop.schema.type.FloatingPoint; +@@ -121,6 +122,7 @@ public TestMatching(Schema from, + return parameters; + } + ++ @SuppressWarnings("deprecation") + @Test + public void testSchemaMatching() throws Exception { + Configuration conf = new Configuration(); +@@ -132,9 +134,9 @@ public void testSchemaMatching() throws Exception { + Job job = new Job(conf); + MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, from); + MRConfigurationUtils.setConnectorSchema(Direction.TO, job, to); +- JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, ++ MRJobTestUtil.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, + DummyOutputFormat.class); +- boolean success = JobUtils.runJob(job.getConfiguration(), ++ boolean success = MRJobTestUtil.runJob(job.getConfiguration(), + SqoopInputFormat.class, SqoopMapper.class, + DummyOutputFormat.class); + if (from.getName().split("-")[1].equals("EMPTY")) { +@@ -233,19 +235,14 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) { + public static class DummyRecordWriter + extends RecordWriter<SqoopWritable, NullWritable> { + private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; +- private Data data = new Data(); ++ private IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF(); + + @Override + public void write(SqoopWritable key, NullWritable value) { +- +- data.setContent(new Object[] { +- index, +- (double) index, +- String.valueOf(index)}, +- Data.ARRAY_RECORD); ++ String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'"; ++ dataFormat.setTextData(testData); + index++; +- +- assertEquals(data.toString(), key.toString()); ++ assertEquals(dataFormat.getTextData().toString(), key.toString()); + } + + @Override +diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java +deleted file mode 100644 +index 68ce5ed..0000000 +--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java ++++ /dev/null +@@ -1,95 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one +- * or more contributor license agreements. See the NOTICE file +- * distributed with this work for additional information +- * regarding copyright ownership. The ASF licenses this file +- * to you under the Apache License, Version 2.0 (the +- * "License"); you may not use this file except in compliance +- * with the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, +- * software distributed under the License is distributed on an +- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +- * KIND, either express or implied. See the License for the +- * specific language governing permissions and limitations +- * under the License. +- */ +-package org.apache.sqoop.job.io; +- +-import com.google.common.base.Charsets; +- +-import java.io.ByteArrayInputStream; +-import java.io.ByteArrayOutputStream; +-import java.io.DataInput; +-import java.io.DataInputStream; +-import java.io.DataOutput; +-import java.io.DataOutputStream; +-import java.io.IOException; +-import java.io.InputStream; +- +-import org.apache.hadoop.conf.Configuration; +-import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; +-import org.apache.sqoop.job.MRJobConstants; +-import org.junit.Assert; +-import org.junit.Test; +- +-public class SqoopWritableTest { +- +- private final SqoopWritable writable = new SqoopWritable(); +- +- @Test +- public void testStringInStringOut() { +- String testData = "Live Long and prosper"; +- writable.setString(testData); +- Assert.assertEquals(testData,writable.getString()); +- } +- +- @Test +- public void testDataWritten() throws IOException { +- String testData = "One ring to rule them all"; +- byte[] testDataBytes = testData.getBytes(Charsets.UTF_8); +- writable.setString(testData); +- ByteArrayOutputStream ostream = new ByteArrayOutputStream(); +- DataOutput out = new DataOutputStream(ostream); +- writable.write(out); +- byte[] written = ostream.toByteArray(); +- InputStream instream = new ByteArrayInputStream(written); +- DataInput in = new DataInputStream(instream); +- String readData = in.readUTF(); +- Assert.assertEquals(testData, readData); +- } +- +- @Test +- public void testDataRead() throws IOException { +- String testData = "Brandywine Bridge - 20 miles!"; +- ByteArrayOutputStream ostream = new ByteArrayOutputStream(); +- DataOutput out = new DataOutputStream(ostream); +- out.writeUTF(testData); +- InputStream instream = new ByteArrayInputStream(ostream.toByteArray()); +- DataInput in = new DataInputStream(instream); +- writable.readFields(in); +- Assert.assertEquals(testData, writable.getString()); +- } +- +- @Test +- public void testWriteReadUsingStream() throws IOException { +- String testData = "You shall not pass"; +- ByteArrayOutputStream ostream = new ByteArrayOutputStream(); +- DataOutput out = new DataOutputStream(ostream); +- writable.setString(testData); +- writable.write(out); +- byte[] written = ostream.toByteArray(); +- +- //Don't test what the data is, test that SqoopWritable can read it. +- InputStream instream = new ByteArrayInputStream(written); +- SqoopWritable newWritable = new SqoopWritable(); +- DataInput in = new DataInputStream(instream); +- newWritable.readFields(in); +- Assert.assertEquals(testData, newWritable.getString()); +- ostream.close(); +- instream.close(); +- } +- +-} +diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java +deleted file mode 100644 +index 4e23bcb..0000000 +--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java ++++ /dev/null +@@ -1,117 +0,0 @@ +-/** +- * Licensed to the Apache Software Foundation (ASF) under one +- * or more contributor license agreements. See the NOTICE file +- * distributed with this work for additional information +- * regarding copyright ownership. The ASF licenses this file +- * to you under the Apache License, Version 2.0 (the +- * "License"); you may not use this file except in compliance +- * with the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +-package org.apache.sqoop.job.io; +- +-import java.util.Arrays; +- +-import org.junit.Assert; +-import org.junit.Test; +- +-public class TestData { +- +- private static final double TEST_NUMBER = Math.PI + 100; +- @Test +- public void testArrayToCsv() throws Exception { +- Data data = new Data(); +- String expected; +- String actual; +- +- // with special characters: +- expected = +- Long.valueOf((long)TEST_NUMBER) + "," + +- Double.valueOf(TEST_NUMBER) + "," + +- "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," + +- Arrays.toString(new byte[] {1, 2, 3, 4, 5}); +- data.setContent(new Object[] { +- Long.valueOf((long)TEST_NUMBER), +- Double.valueOf(TEST_NUMBER), +- String.valueOf(TEST_NUMBER) + "',s", +- new byte[] {1, 2, 3, 4, 5} }, +- Data.ARRAY_RECORD); +- actual = (String)data.getContent(Data.CSV_RECORD); +- assertEquals(expected, actual); +- +- // with null characters: +- expected = +- Long.valueOf((long)TEST_NUMBER) + "," + +- Double.valueOf(TEST_NUMBER) + "," + +- "null" + "," + +- Arrays.toString(new byte[] {1, 2, 3, 4, 5}); +- data.setContent(new Object[] { +- Long.valueOf((long)TEST_NUMBER), +- Double.valueOf(TEST_NUMBER), +- null, +- new byte[] {1, 2, 3, 4, 5} }, +- Data.ARRAY_RECORD); +- actual = (String)data.getContent(Data.CSV_RECORD); +- assertEquals(expected, actual); +- } +- +- @Test +- public void testCsvToArray() throws Exception { +- Data data = new Data(); +- Object[] expected; +- Object[] actual; +- +- // with special characters: +- expected = new Object[] { +- Long.valueOf((long)TEST_NUMBER), +- Double.valueOf(TEST_NUMBER), +- String.valueOf(TEST_NUMBER) + "',s", +- new byte[] {1, 2, 3, 4, 5} }; +- data.setContent( +- Long.valueOf((long)TEST_NUMBER) + "," + +- Double.valueOf(TEST_NUMBER) + "," + +- "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," + +- Arrays.toString(new byte[] {1, 2, 3, 4, 5}), +- Data.CSV_RECORD); +- actual = (Object[])data.getContent(Data.ARRAY_RECORD); +- assertEquals(expected.length, actual.length); +- for (int c=0; c<expected.length; c++) { +- assertEquals(expected[c], actual[c]); +- } +- +- // with null characters: +- expected = new Object[] { +- Long.valueOf((long)TEST_NUMBER), +- Double.valueOf(TEST_NUMBER), +- null, +- new byte[] {1, 2, 3, 4, 5} }; +- data.setContent( +- Long.valueOf((long)TEST_NUMBER) + "," + +- Double.valueOf(TEST_NUMBER) + "," + +- "null" + "," + +- Arrays.toString(new byte[] {1, 2, 3, 4, 5}), +- Data.CSV_RECORD); +- actual = (Object[])data.getContent(Data.ARRAY_RECORD); +- assertEquals(expected.length, actual.length); +- for (int c=0; c<expected.length; c++) { +- assertEquals(expected[c], actual[c]); +- } +- } +- +- public static void assertEquals(Object expected, Object actual) { +- if (expected instanceof byte[]) { +- Assert.assertEquals(Arrays.toString((byte[])expected), +- Arrays.toString((byte[])actual)); +- } else { +- Assert.assertEquals(expected, actual); +- } +- } +- +-} +diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java +new file mode 100644 +index 0000000..3207e53 +--- /dev/null ++++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java +@@ -0,0 +1,89 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++package org.apache.sqoop.job.io; ++ ++import java.io.ByteArrayInputStream; ++import java.io.ByteArrayOutputStream; ++import java.io.DataInput; ++import java.io.DataInputStream; ++import java.io.DataOutput; ++import java.io.DataOutputStream; ++import java.io.IOException; ++import java.io.InputStream; ++ ++import org.junit.Assert; ++import org.junit.Test; ++ ++public class TestSqoopWritable { ++ ++ private final SqoopWritable writable = new SqoopWritable(); ++ ++ @Test ++ public void testStringInStringOut() { ++ String testData = "Live Long and prosper"; ++ writable.setString(testData); ++ Assert.assertEquals(testData,writable.getString()); ++ } ++ ++ @Test ++ public void testDataWritten() throws IOException { ++ String testData = "One ring to rule them all"; ++ writable.setString(testData); ++ ByteArrayOutputStream ostream = new ByteArrayOutputStream(); ++ DataOutput out = new DataOutputStream(ostream); ++ writable.write(out); ++ byte[] written = ostream.toByteArray(); ++ InputStream instream = new ByteArrayInputStream(written); ++ DataInput in = new DataInputStream(instream); ++ String readData = in.readUTF(); ++ Assert.assertEquals(testData, readData); ++ } ++ ++ @Test ++ public void testDataRead() throws IOException { ++ String testData = "Brandywine Bridge - 20 miles!"; ++ ByteArrayOutputStream ostream = new ByteArrayOutputStream(); ++ DataOutput out = new DataOutputStream(ostream); ++ out.writeUTF(testData); ++ InputStream instream = new ByteArrayInputStream(ostream.toByteArray()); ++ DataInput in = new DataInputStream(instream); ++ writable.readFields(in); ++ Assert.assertEquals(testData, writable.getString()); ++ } ++ ++ @Test ++ public void testWriteReadUsingStream() throws IOException { ++ String testData = "You shall not pass"; ++ ByteArrayOutputStream ostream = new ByteArrayOutputStream(); ++ DataOutput out = new DataOutputStream(ostream); ++ writable.setString(testData); ++ writable.write(out); ++ byte[] written = ostream.toByteArray(); ++ ++ //Don't test what the data is, test that SqoopWritable can read it. ++ InputStream instream = new ByteArrayInputStream(written); ++ SqoopWritable newWritable = new SqoopWritable(); ++ DataInput in = new DataInputStream(instream); ++ newWritable.readFields(in); ++ Assert.assertEquals(testData, newWritable.getString()); ++ ostream.close(); ++ instream.close(); ++ } ++ ++} +diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java +index 5bd11f0..67e965d 100644 +--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java ++++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java +@@ -18,6 +18,10 @@ + */ + package org.apache.sqoop.job.mr; + ++import java.util.ConcurrentModificationException; ++import java.util.concurrent.BrokenBarrierException; ++import java.util.concurrent.TimeUnit; ++ + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.io.NullWritable; + import org.apache.hadoop.mapreduce.RecordWriter; +@@ -28,14 +32,11 @@ + import org.apache.sqoop.job.etl.Loader; + import org.apache.sqoop.job.etl.LoaderContext; + import org.apache.sqoop.job.io.SqoopWritable; ++import org.apache.sqoop.job.util.MRJobTestUtil; + import org.junit.Assert; + import org.junit.Before; + import org.junit.Test; + +-import java.util.ConcurrentModificationException; +-import java.util.concurrent.BrokenBarrierException; +-import java.util.concurrent.TimeUnit; +- + public class TestSqoopOutputFormatLoadExecutor { + + private Configuration conf; +@@ -130,12 +131,12 @@ public void testWhenLoaderThrows() throws Throwable { + SqoopOutputFormatLoadExecutor executor = new + SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName()); + RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); +- IntermediateDataFormat data = new CSVIntermediateDataFormat(); ++ IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF(); + SqoopWritable writable = new SqoopWritable(); + try { + for (int count = 0; count < 100; count++) { +- data.setTextData(String.valueOf(count)); +- writable.setString(data.getTextData()); ++ dataFormat.setTextData(String.valueOf(count)); ++ writable.setString(dataFormat.getTextData()); + writer.write(writable, null); + } + } catch (SqoopException ex) { +@@ -149,7 +150,7 @@ public void testSuccessfulContinuousLoader() throws Throwable { + SqoopOutputFormatLoadExecutor executor = new + SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName()); + RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); +- IntermediateDataFormat data = new CSVIntermediateDataFormat(); ++ IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF(); + SqoopWritable writable = new SqoopWritable(); + for (int i = 0; i < 10; i++) { + StringBuilder builder = new StringBuilder(); +@@ -159,8 +160,8 @@ public void testSuccessfulContinuousLoader() throws Throwable { + builder.append(","); + } + } +- data.setTextData(builder.toString()); +- writable.setString(data.getTextData()); ++ dataFormat.setTextData(builder.toString()); ++ writable.setString(dataFormat.getTextData()); + writer.write(writable, null); + } + writer.close(null); +@@ -171,7 +172,7 @@ public void testSuccessfulLoader() throws Throwable { + SqoopOutputFormatLoadExecutor executor = new + SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName()); + RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); +- IntermediateDataFormat data = new CSVIntermediateDataFormat(); ++ IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF(); + SqoopWritable writable = new SqoopWritable(); + StringBuilder builder = new StringBuilder(); + for (int count = 0; count < 100; count++) { +@@ -180,8 +181,8 @@ public void testSuccessfulLoader() throws Throwable { + builder.append(","); + } + } +- data.setTextData(builder.toString()); +- writable.setString(data.getTextData()); ++ dataFormat.setTextData(builder.toString()); ++ writable.setString(dataFormat.getTextData()); + writer.write(writable, null); + + //Allow writer to complete. +@@ -196,7 +197,7 @@ public void testThrowingContinuousLoader() throws Throwable { + SqoopOutputFormatLoadExecutor executor = new + SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName()); + RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); +- IntermediateDataFormat data = new CSVIntermediateDataFormat(); ++ IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF(); + SqoopWritable writable = new SqoopWritable(); + try { + for (int i = 0; i < 10; i++) { +@@ -207,8 +208,8 @@ public void testThrowingContinuousLoader() throws Throwable { + builder.append(","); + } + } +- data.setTextData(builder.toString()); +- writable.setString(data.getTextData()); ++ dataFormat.setTextData(builder.toString()); ++ writable.setString(dataFormat.getTextData()); + writer.write(writable, null); + } + writer.close(null); +diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java +new file mode 100644 +index 0000000..5d5359e +--- /dev/null ++++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java +@@ -0,0 +1,114 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.sqoop.job.util; ++ ++import java.io.IOException; ++ ++import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.io.NullWritable; ++import org.apache.hadoop.mapreduce.InputFormat; ++import org.apache.hadoop.mapreduce.Job; ++import org.apache.hadoop.mapreduce.JobContext; ++import org.apache.hadoop.mapreduce.Mapper; ++import org.apache.hadoop.mapreduce.OutputCommitter; ++import org.apache.hadoop.mapreduce.OutputFormat; ++import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; ++import org.apache.sqoop.connector.idf.IntermediateDataFormat; ++import org.apache.sqoop.job.io.SqoopWritable; ++import org.apache.sqoop.job.mr.SqoopSplit; ++import org.apache.sqoop.schema.Schema; ++import org.apache.sqoop.schema.type.FixedPoint; ++import org.apache.sqoop.schema.type.FloatingPoint; ++import org.apache.sqoop.schema.type.Text; ++import org.apache.sqoop.utils.ClassUtils; ++ ++import static org.mockito.Mockito.mock; ++import static org.mockito.Mockito.when; ++ ++public class MRJobTestUtil { ++ ++ @SuppressWarnings("deprecation") ++ public static boolean runJob(Configuration conf, ++ Class<? extends InputFormat<SqoopSplit, NullWritable>> input, ++ Class<? extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable>> mapper, ++ Class<? extends OutputFormat<SqoopWritable, NullWritable>> output) throws IOException, ++ InterruptedException, ClassNotFoundException { ++ Job job = new Job(conf); ++ job.setInputFormatClass(input); ++ job.setMapperClass(mapper); ++ job.setMapOutputKeyClass(SqoopWritable.class); ++ job.setMapOutputValueClass(NullWritable.class); ++ job.setOutputFormatClass(output); ++ job.setOutputKeyClass(SqoopWritable.class); ++ job.setOutputValueClass(NullWritable.class); ++ ++ boolean ret = job.waitForCompletion(true); ++ ++ // Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called in ++ // LocalJobRuner ++ if (isHadoop1()) { ++ callOutputCommitter(job, output); ++ } ++ ++ return ret; ++ } ++ ++ public static Schema getTestSchema() { ++ Schema schema = new Schema("Test"); ++ schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) ++ .addColumn(new Text("3")); ++ return schema; ++ } ++ ++ public static IntermediateDataFormat<?> getTestIDF() { ++ return new CSVIntermediateDataFormat(getTestSchema()); ++ } ++ ++ /** ++ * Call output format on given job manually. ++ */ ++ private static void callOutputCommitter(Job job, ++ Class<? extends OutputFormat<SqoopWritable, NullWritable>> outputFormat) throws IOException, ++ InterruptedException { ++ OutputCommitter committer = ((OutputFormat<?,?>) ClassUtils.instantiate(outputFormat)) ++ .getOutputCommitter(null); ++ ++ JobContext jobContext = mock(JobContext.class); ++ when(jobContext.getConfiguration()).thenReturn(job.getConfiguration()); ++ ++ committer.commitJob(jobContext); ++ } ++ ++ /** ++ * Detect Hadoop 1.0 installation ++ * ++ * @return True if and only if this is Hadoop 1 and below ++ */ ++ public static boolean isHadoop1() { ++ String version = org.apache.hadoop.util.VersionInfo.getVersion(); ++ if (version.matches("\\b0\\.20\\..+\\b") || version.matches("\\b1\\.\\d\\.\\d")) { ++ return true; ++ } ++ return false; ++ } ++ ++ private MRJobTestUtil() { ++ // Disable explicit object creation ++ } ++ ++} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ace22237/common/src/main/java/org/apache/sqoop/schema/Schema.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/schema/Schema.java b/common/src/main/java/org/apache/sqoop/schema/Schema.java index 40c362c..3aa3aea 100644 --- a/common/src/main/java/org/apache/sqoop/schema/Schema.java +++ b/common/src/main/java/org/apache/sqoop/schema/Schema.java @@ -122,12 +122,7 @@ public class Schema { } public boolean isEmpty() { - if (columns.size()==0) { - return true; - } else { - return false; - } - + return columns.size() == 0; } public String toString() { http://git-wip-us.apache.org/repos/asf/sqoop/blob/ace22237/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java index e0e4061..e65edd9 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java @@ -67,9 +67,15 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { private final List<Integer> stringFieldIndices = new ArrayList<Integer>(); private final List<Integer> byteFieldIndices = new ArrayList<Integer>(); - private Schema schema; + public CSVIntermediateDataFormat() { + } + + public CSVIntermediateDataFormat(Schema schema) { + setSchema(schema); + } + /** * {@inheritDoc} */ @@ -166,7 +172,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { */ @Override public Object[] getObjectData() { - if (schema.isEmpty()) { + if (schema == null || schema.isEmpty()) { throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ace22237/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java index 72e95ed..fcf6c3c 100644 --- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java @@ -41,11 +41,11 @@ public class TestCSVIntermediateDataFormat { private final String BYTE_FIELD_ENCODING = "ISO-8859-1"; - private IntermediateDataFormat<?> data; + private IntermediateDataFormat<?> dataFormat; @Before public void setUp() { - data = new CSVIntermediateDataFormat(); + dataFormat = new CSVIntermediateDataFormat(); } private String getByteFieldString(byte[] byteFieldData) { @@ -61,8 +61,8 @@ public class TestCSVIntermediateDataFormat { public void testStringInStringOut() { String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) + ",'" + String.valueOf(0x0A) + "'"; - data.setTextData(testData); - assertEquals(testData, data.getTextData()); + dataFormat.setTextData(testData); + assertEquals(testData, dataFormat.getTextData()); } @Test @@ -74,10 +74,10 @@ public class TestCSVIntermediateDataFormat { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); - data.setTextData(null); + dataFormat.setSchema(schema); + dataFormat.setTextData(null); - Object[] out = data.getObjectData(); + Object[] out = dataFormat.getObjectData(); assertNull(out); } @@ -91,10 +91,10 @@ public class TestCSVIntermediateDataFormat { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); - data.setTextData(""); + dataFormat.setSchema(schema); + dataFormat.setTextData(""); - data.getObjectData(); + dataFormat.getObjectData(); } @Test @@ -111,10 +111,10 @@ public class TestCSVIntermediateDataFormat { .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); - data.setTextData(testData); + dataFormat.setSchema(schema); + dataFormat.setTextData(testData); - Object[] out = data.getObjectData(); + Object[] out = dataFormat.getObjectData(); assertEquals(new Long(10),out[0]); assertEquals(new Long(34),out[1]); @@ -134,7 +134,7 @@ public class TestCSVIntermediateDataFormat { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); + dataFormat.setSchema(schema); byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54}; Object[] in = new Object[6]; @@ -145,12 +145,12 @@ public class TestCSVIntermediateDataFormat { in[4] = byteFieldData; in[5] = new String(new char[] { 0x0A }); - data.setObjectData(in); + dataFormat.setObjectData(in); //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements String testData = "10,34,'54','random data'," + getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'"; - assertEquals(testData, data.getTextData()); + assertEquals(testData, dataFormat.getTextData()); } @Test @@ -164,7 +164,7 @@ public class TestCSVIntermediateDataFormat { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); + dataFormat.setSchema(schema); Object[] in = new Object[6]; in[0] = new Long(10); @@ -177,9 +177,9 @@ public class TestCSVIntermediateDataFormat { System.arraycopy(in,0,inCopy,0,in.length); // Modifies the input array, so we use the copy to confirm - data.setObjectData(in); + dataFormat.setObjectData(in); - assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); + assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData())); } @Test @@ -191,7 +191,7 @@ public class TestCSVIntermediateDataFormat { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); + dataFormat.setSchema(schema); byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54}; Object[] in = new Object[6]; @@ -202,12 +202,12 @@ public class TestCSVIntermediateDataFormat { in[4] = byteFieldData; in[5] = new String(new char[] { 0x0A }); - data.setObjectData(in); + dataFormat.setObjectData(in); //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements String testData = "10,34,NULL,'random data'," + getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'"; - assertEquals(testData, data.getTextData()); + assertEquals(testData, dataFormat.getTextData()); } @Test @@ -215,7 +215,7 @@ public class TestCSVIntermediateDataFormat { Schema schema = new Schema("test"); schema.addColumn(new Text("1")); - data.setSchema(schema); + dataFormat.setSchema(schema); char[] allCharArr = new char[256]; for(int i = 0; i < allCharArr.length; ++i) { @@ -228,17 +228,17 @@ public class TestCSVIntermediateDataFormat { System.arraycopy(in, 0, inCopy, 0, in.length); // Modifies the input array, so we use the copy to confirm - data.setObjectData(in); + dataFormat.setObjectData(in); - assertEquals(strData, data.getObjectData()[0]); - assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); + assertEquals(strData, dataFormat.getObjectData()[0]); + assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData())); } @Test public void testByteArrayFullRangeOfCharacters() { Schema schema = new Schema("test"); schema.addColumn(new Binary("1")); - data.setSchema(schema); + dataFormat.setSchema(schema); byte[] allCharByteArr = new byte[256]; for (int i = 0; i < allCharByteArr.length; ++i) { @@ -250,32 +250,32 @@ public class TestCSVIntermediateDataFormat { System.arraycopy(in, 0, inCopy, 0, in.length); // Modifies the input array, so we use the copy to confirm - data.setObjectData(in); - assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); + dataFormat.setObjectData(in); + assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData())); } @Test public void testDate() { Schema schema = new Schema("test"); schema.addColumn(new Date("1")); - data.setSchema(schema); + dataFormat.setSchema(schema); - data.setTextData("2014-10-01"); - assertEquals("2014-10-01", data.getObjectData()[0].toString()); + dataFormat.setTextData("2014-10-01"); + assertEquals("2014-10-01", dataFormat.getObjectData()[0].toString()); } @Test public void testDateTime() { Schema schema = new Schema("test"); schema.addColumn(new DateTime("1")); - data.setSchema(schema); + dataFormat.setSchema(schema); for (String dateTime : new String[]{ "2014-10-01T12:00:00", "2014-10-01T12:00:00.000" }) { - data.setTextData(dateTime); - assertEquals("2014-10-01T12:00:00.000", data.getObjectData()[0].toString()); + dataFormat.setTextData(dateTime); + assertEquals("2014-10-01T12:00:00.000", dataFormat.getObjectData()[0].toString()); } } @@ -289,14 +289,14 @@ public class TestCSVIntermediateDataFormat { public void testDateTimeISO8601Alternative() { Schema schema = new Schema("test"); schema.addColumn(new DateTime("1")); - data.setSchema(schema); + dataFormat.setSchema(schema); for (String dateTime : new String[]{ "2014-10-01 12:00:00", "2014-10-01 12:00:00.000" }) { - data.setTextData(dateTime); - assertEquals("2014-10-01T12:00:00.000", data.getObjectData()[0].toString()); + dataFormat.setTextData(dateTime); + assertEquals("2014-10-01T12:00:00.000", dataFormat.getObjectData()[0].toString()); } } @@ -304,20 +304,20 @@ public class TestCSVIntermediateDataFormat { public void testBit() { Schema schema = new Schema("test"); schema.addColumn(new Bit("1")); - data.setSchema(schema); + dataFormat.setSchema(schema); for (String trueBit : new String[]{ "true", "TRUE", "1" }) { - data.setTextData(trueBit); - assertTrue((Boolean) data.getObjectData()[0]); + dataFormat.setTextData(trueBit); + assertTrue((Boolean) dataFormat.getObjectData()[0]); } for (String falseBit : new String[]{ "false", "FALSE", "0" }) { - data.setTextData(falseBit); - assertFalse((Boolean) data.getObjectData()[0]); + dataFormat.setTextData(falseBit); + assertFalse((Boolean) dataFormat.getObjectData()[0]); } } @@ -326,9 +326,23 @@ public class TestCSVIntermediateDataFormat { String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) + ",'\\n'"; Schema schema = new Schema("Test"); - data.setSchema(schema); - data.setTextData(testData); + dataFormat.setSchema(schema); + dataFormat.setTextData(testData); - Object[] out = data.getObjectData(); + @SuppressWarnings("unused") + Object[] out = dataFormat.getObjectData(); + } + + @Test(expected = SqoopException.class) + public void testNullSchema() { + dataFormat.setSchema(null); + @SuppressWarnings("unused") + Object[] out = dataFormat.getObjectData(); + } + + @Test(expected = SqoopException.class) + public void testNotSettingSchema() { + @SuppressWarnings("unused") + Object[] out = dataFormat.getObjectData(); } }
