Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 4fd8dec14 -> 91fbced8c


SQOOP-1713: Sqoop2: Remove SQOOP-1348.patch file

(Abraham Elmahrek via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/91fbced8
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/91fbced8
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/91fbced8

Branch: refs/heads/sqoop2
Commit: 91fbced8c3fdd99e4115f2d03cbdd7ede139d43c
Parents: 4fd8dec
Author: Jarek Jarcec Cecho <[email protected]>
Authored: Tue Nov 11 10:51:30 2014 -0800
Committer: Jarek Jarcec Cecho <[email protected]>
Committed: Tue Nov 11 10:51:30 2014 -0800

----------------------------------------------------------------------
 SQOOP-1348.patch | 1844 -------------------------------------------------
 1 file changed, 1844 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/91fbced8/SQOOP-1348.patch
----------------------------------------------------------------------
diff --git a/SQOOP-1348.patch b/SQOOP-1348.patch
deleted file mode 100644
index 7834a3f..0000000
--- a/SQOOP-1348.patch
+++ /dev/null
@@ -1,1844 +0,0 @@
-diff --git a/common/src/main/java/org/apache/sqoop/schema/Schema.java 
b/common/src/main/java/org/apache/sqoop/schema/Schema.java
-index 40c362c..3aa3aea 100644
---- a/common/src/main/java/org/apache/sqoop/schema/Schema.java
-+++ b/common/src/main/java/org/apache/sqoop/schema/Schema.java
-@@ -122,12 +122,7 @@ public Schema setCreationDate(Date creationDate) {
-   }
- 
-   public boolean isEmpty() {
--    if (columns.size()==0) {
--      return true;
--    } else {
--      return false;
--    }
--
-+    return columns.size() == 0;
-   }
- 
-   public String toString() {
-diff --git 
a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
 
b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
-index e0e4061..e65edd9 100644
---- 
a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
-+++ 
b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
-@@ -67,9 +67,15 @@
- 
-   private final List<Integer> stringFieldIndices = new ArrayList<Integer>();
-   private final List<Integer> byteFieldIndices = new ArrayList<Integer>();
--
-   private Schema schema;
- 
-+  public CSVIntermediateDataFormat() {
-+  }
-+
-+  public CSVIntermediateDataFormat(Schema schema) {
-+    setSchema(schema);
-+  }
-+
-   /**
-    * {@inheritDoc}
-    */
-@@ -166,7 +172,7 @@ public void setSchema(Schema schema) {
-    */
-   @Override
-   public Object[] getObjectData() {
--    if (schema.isEmpty()) {
-+    if (schema == null || schema.isEmpty()) {
-       throw new 
SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006);
-     }
- 
-diff --git 
a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
 
b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
-index 72e95ed..fcf6c3c 100644
---- 
a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
-+++ 
b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
-@@ -41,11 +41,11 @@
- 
-   private final String BYTE_FIELD_ENCODING = "ISO-8859-1";
- 
--  private IntermediateDataFormat<?> data;
-+  private IntermediateDataFormat<?> dataFormat;
- 
-   @Before
-   public void setUp() {
--    data = new CSVIntermediateDataFormat();
-+    dataFormat = new CSVIntermediateDataFormat();
-   }
- 
-   private String getByteFieldString(byte[] byteFieldData) {
-@@ -61,8 +61,8 @@ private String getByteFieldString(byte[] byteFieldData) {
-   public void testStringInStringOut() {
-     String testData = "10,34,'54','random data'," + getByteFieldString(new 
byte[] { (byte) -112, (byte) 54})
-       + ",'" + String.valueOf(0x0A) + "'";
--    data.setTextData(testData);
--    assertEquals(testData, data.getTextData());
-+    dataFormat.setTextData(testData);
-+    assertEquals(testData, dataFormat.getTextData());
-   }
- 
-   @Test
-@@ -74,10 +74,10 @@ public void testNullStringInObjectOut() {
-         .addColumn(new Text("4"))
-         .addColumn(new Binary("5"))
-         .addColumn(new Text("6"));
--    data.setSchema(schema);
--    data.setTextData(null);
-+    dataFormat.setSchema(schema);
-+    dataFormat.setTextData(null);
- 
--    Object[] out = data.getObjectData();
-+    Object[] out = dataFormat.getObjectData();
- 
-     assertNull(out);
-   }
-@@ -91,10 +91,10 @@ public void testEmptyStringInObjectOut() {
-         .addColumn(new Text("4"))
-         .addColumn(new Binary("5"))
-         .addColumn(new Text("6"));
--    data.setSchema(schema);
--    data.setTextData("");
-+    dataFormat.setSchema(schema);
-+    dataFormat.setTextData("");
- 
--    data.getObjectData();
-+    dataFormat.getObjectData();
-   }
- 
-   @Test
-@@ -111,10 +111,10 @@ public void testStringInObjectOut() {
-         .addColumn(new Binary("5"))
-         .addColumn(new Text("6"));
- 
--    data.setSchema(schema);
--    data.setTextData(testData);
-+    dataFormat.setSchema(schema);
-+    dataFormat.setTextData(testData);
- 
--    Object[] out = data.getObjectData();
-+    Object[] out = dataFormat.getObjectData();
- 
-     assertEquals(new Long(10),out[0]);
-     assertEquals(new Long(34),out[1]);
-@@ -134,7 +134,7 @@ public void testObjectInStringOut() {
-         .addColumn(new Text("4"))
-         .addColumn(new Binary("5"))
-         .addColumn(new Text("6"));
--    data.setSchema(schema);
-+    dataFormat.setSchema(schema);
- 
-     byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
-     Object[] in = new Object[6];
-@@ -145,12 +145,12 @@ public void testObjectInStringOut() {
-     in[4] = byteFieldData;
-     in[5] = new String(new char[] { 0x0A });
- 
--    data.setObjectData(in);
-+    dataFormat.setObjectData(in);
- 
-     //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
-     String testData = "10,34,'54','random data'," +
-         getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + 
",'\\n'";
--    assertEquals(testData, data.getTextData());
-+    assertEquals(testData, dataFormat.getTextData());
-   }
- 
-   @Test
-@@ -164,7 +164,7 @@ public void testObjectInObjectOut() {
-         .addColumn(new Text("4"))
-         .addColumn(new Binary("5"))
-         .addColumn(new Text("6"));
--    data.setSchema(schema);
-+    dataFormat.setSchema(schema);
- 
-     Object[] in = new Object[6];
-     in[0] = new Long(10);
-@@ -177,9 +177,9 @@ public void testObjectInObjectOut() {
-     System.arraycopy(in,0,inCopy,0,in.length);
- 
-     // Modifies the input array, so we use the copy to confirm
--    data.setObjectData(in);
-+    dataFormat.setObjectData(in);
- 
--    assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
-+    assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData()));
-   }
- 
-   @Test
-@@ -191,7 +191,7 @@ public void testObjectWithNullInStringOut() {
-         .addColumn(new Text("4"))
-         .addColumn(new Binary("5"))
-         .addColumn(new Text("6"));
--    data.setSchema(schema);
-+    dataFormat.setSchema(schema);
- 
-     byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
-     Object[] in = new Object[6];
-@@ -202,12 +202,12 @@ public void testObjectWithNullInStringOut() {
-     in[4] = byteFieldData;
-     in[5] = new String(new char[] { 0x0A });
- 
--    data.setObjectData(in);
-+    dataFormat.setObjectData(in);
- 
-     //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
-     String testData = "10,34,NULL,'random data'," +
-         getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + 
",'\\n'";
--    assertEquals(testData, data.getTextData());
-+    assertEquals(testData, dataFormat.getTextData());
-   }
- 
-   @Test
-@@ -215,7 +215,7 @@ public void testStringFullRangeOfCharacters() {
-     Schema schema = new Schema("test");
-     schema.addColumn(new Text("1"));
- 
--    data.setSchema(schema);
-+    dataFormat.setSchema(schema);
- 
-     char[] allCharArr = new char[256];
-     for(int i = 0; i < allCharArr.length; ++i) {
-@@ -228,17 +228,17 @@ public void testStringFullRangeOfCharacters() {
-     System.arraycopy(in, 0, inCopy, 0, in.length);
- 
-     // Modifies the input array, so we use the copy to confirm
--    data.setObjectData(in);
-+    dataFormat.setObjectData(in);
- 
--    assertEquals(strData, data.getObjectData()[0]);
--    assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
-+    assertEquals(strData, dataFormat.getObjectData()[0]);
-+    assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData()));
-   }
- 
-   @Test
-   public void testByteArrayFullRangeOfCharacters() {
-     Schema schema = new Schema("test");
-     schema.addColumn(new Binary("1"));
--    data.setSchema(schema);
-+    dataFormat.setSchema(schema);
- 
-     byte[] allCharByteArr = new byte[256];
-     for (int i = 0; i < allCharByteArr.length; ++i) {
-@@ -250,32 +250,32 @@ public void testByteArrayFullRangeOfCharacters() {
-     System.arraycopy(in, 0, inCopy, 0, in.length);
- 
-     // Modifies the input array, so we use the copy to confirm
--    data.setObjectData(in);
--    assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
-+    dataFormat.setObjectData(in);
-+    assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData()));
-   }
- 
-   @Test
-   public void testDate() {
-     Schema schema = new Schema("test");
-     schema.addColumn(new Date("1"));
--    data.setSchema(schema);
-+    dataFormat.setSchema(schema);
- 
--    data.setTextData("2014-10-01");
--    assertEquals("2014-10-01", data.getObjectData()[0].toString());
-+    dataFormat.setTextData("2014-10-01");
-+    assertEquals("2014-10-01", dataFormat.getObjectData()[0].toString());
-   }
- 
-   @Test
-   public void testDateTime() {
-     Schema schema = new Schema("test");
-     schema.addColumn(new DateTime("1"));
--    data.setSchema(schema);
-+    dataFormat.setSchema(schema);
- 
-     for (String dateTime : new String[]{
-         "2014-10-01T12:00:00",
-         "2014-10-01T12:00:00.000"
-     }) {
--      data.setTextData(dateTime);
--      assertEquals("2014-10-01T12:00:00.000", 
data.getObjectData()[0].toString());
-+      dataFormat.setTextData(dateTime);
-+      assertEquals("2014-10-01T12:00:00.000", 
dataFormat.getObjectData()[0].toString());
-     }
-   }
- 
-@@ -289,14 +289,14 @@ public void testDateTime() {
-   public void testDateTimeISO8601Alternative() {
-     Schema schema = new Schema("test");
-     schema.addColumn(new DateTime("1"));
--    data.setSchema(schema);
-+    dataFormat.setSchema(schema);
- 
-     for (String dateTime : new String[]{
-         "2014-10-01 12:00:00",
-         "2014-10-01 12:00:00.000"
-     }) {
--      data.setTextData(dateTime);
--      assertEquals("2014-10-01T12:00:00.000", 
data.getObjectData()[0].toString());
-+      dataFormat.setTextData(dateTime);
-+      assertEquals("2014-10-01T12:00:00.000", 
dataFormat.getObjectData()[0].toString());
-     }
-   }
- 
-@@ -304,20 +304,20 @@ public void testDateTimeISO8601Alternative() {
-   public void testBit() {
-     Schema schema = new Schema("test");
-     schema.addColumn(new Bit("1"));
--    data.setSchema(schema);
-+    dataFormat.setSchema(schema);
- 
-     for (String trueBit : new String[]{
-         "true", "TRUE", "1"
-     }) {
--      data.setTextData(trueBit);
--      assertTrue((Boolean) data.getObjectData()[0]);
-+      dataFormat.setTextData(trueBit);
-+      assertTrue((Boolean) dataFormat.getObjectData()[0]);
-     }
- 
-     for (String falseBit : new String[]{
-         "false", "FALSE", "0"
-     }) {
--      data.setTextData(falseBit);
--      assertFalse((Boolean) data.getObjectData()[0]);
-+      dataFormat.setTextData(falseBit);
-+      assertFalse((Boolean) dataFormat.getObjectData()[0]);
-     }
-   }
- 
-@@ -326,9 +326,23 @@ public void testEmptySchema() {
-     String testData = "10,34,'54','random data'," + getByteFieldString(new 
byte[] { (byte) -112, (byte) 54})
-         + ",'\\n'";
-     Schema schema = new Schema("Test");
--    data.setSchema(schema);
--    data.setTextData(testData);
-+    dataFormat.setSchema(schema);
-+    dataFormat.setTextData(testData);
- 
--    Object[] out = data.getObjectData();
-+    @SuppressWarnings("unused")
-+    Object[] out = dataFormat.getObjectData();
-+  }
-+
-+  @Test(expected = SqoopException.class)
-+  public void testNullSchema() {
-+    dataFormat.setSchema(null);
-+    @SuppressWarnings("unused")
-+    Object[] out = dataFormat.getObjectData();
-+  }
-+
-+  @Test(expected = SqoopException.class)
-+  public void testNotSettingSchema() {
-+    @SuppressWarnings("unused")
-+    Object[] out = dataFormat.getObjectData();
-   }
- }
-diff --git 
a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java 
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
-deleted file mode 100644
-index 139883e..0000000
---- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
-+++ /dev/null
-@@ -1,529 +0,0 @@
--/**
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.sqoop.job.io;
--
--import java.io.DataInput;
--import java.io.DataOutput;
--import java.io.IOException;
--import java.nio.charset.Charset;
--import java.util.ArrayList;
--import java.util.Arrays;
--import java.util.regex.Matcher;
--
--import org.apache.hadoop.io.WritableComparable;
--import org.apache.hadoop.io.WritableComparator;
--import org.apache.hadoop.io.WritableUtils;
--import org.apache.sqoop.common.SqoopException;
--import org.apache.sqoop.job.MRExecutionError;
--
--public class Data implements WritableComparable<Data> {
--
--  // The content is an Object to accommodate different kinds of data.
--  // For example, it can be:
--  // - Object[] for an array of object record
--  // - String for a text of CSV record
--  private volatile Object content = null;
--
--  public static final int EMPTY_DATA = 0;
--  public static final int CSV_RECORD = 1;
--  public static final int ARRAY_RECORD = 2;
--  private int type = EMPTY_DATA;
--
--  public static final String CHARSET_NAME = "UTF-8";
--
--  public static final char DEFAULT_RECORD_DELIMITER = '\n';
--  public static final char DEFAULT_FIELD_DELIMITER = ',';
--  public static final char DEFAULT_STRING_DELIMITER = '\'';
--  public static final char DEFAULT_STRING_ESCAPE = '\\';
--  private char fieldDelimiter = DEFAULT_FIELD_DELIMITER;
--  private char stringDelimiter = DEFAULT_STRING_DELIMITER;
--  private char stringEscape = DEFAULT_STRING_ESCAPE;
--  private String escapedStringDelimiter = String.valueOf(new char[] {
--      stringEscape, stringDelimiter
--  });
--
--  private int[] fieldTypes = null;
--
--  public void setFieldDelimiter(char fieldDelimiter) {
--    this.fieldDelimiter = fieldDelimiter;
--  }
--
--  public void setFieldTypes(int[] fieldTypes) {
--    this.fieldTypes = fieldTypes;
--  }
--
--  public void setContent(Object content, int type) {
--    switch (type) {
--    case EMPTY_DATA:
--    case CSV_RECORD:
--    case ARRAY_RECORD:
--      this.type = type;
--      this.content = content;
--      break;
--    default:
--      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, 
String.valueOf(type));
--    }
--  }
--
--  public Object getContent(int targetType) {
--    switch (targetType) {
--    case CSV_RECORD:
--      return format();
--    case ARRAY_RECORD:
--      return parse();
--    default:
--      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, 
String.valueOf(targetType));
--    }
--  }
--
--  public int getType() {
--    return type;
--  }
--
--  public boolean isEmpty() {
--    return (type == EMPTY_DATA);
--  }
--
--  @Override
--  public String toString() {
--    return (String)getContent(CSV_RECORD);
--  }
--
--  @Override
--  public int compareTo(Data other) {
--    byte[] myBytes = toString().getBytes(Charset.forName(CHARSET_NAME));
--    byte[] otherBytes = other.toString().getBytes(
--        Charset.forName(CHARSET_NAME));
--    return WritableComparator.compareBytes(
--        myBytes, 0, myBytes.length, otherBytes, 0, otherBytes.length);
--  }
--
--  @Override
--  public boolean equals(Object other) {
--    if (!(other instanceof Data)) {
--      return false;
--    }
--
--    Data data = (Data)other;
--    if (type != data.getType()) {
--      return false;
--    }
--
--    return toString().equals(data.toString());
--  }
--
--  @Override
--  public int hashCode() {
--    int result = super.hashCode();
--    switch (type) {
--    case CSV_RECORD:
--      result += 31 * content.hashCode();
--      return result;
--    case ARRAY_RECORD:
--      Object[] array = (Object[])content;
--      for (int i = 0; i < array.length; i++) {
--        result += 31 * array[i].hashCode();
--      }
--      return result;
--    default:
--      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, 
String.valueOf(type));
--    }
--  }
--
--  @Override
--  public void readFields(DataInput in) throws IOException {
--    type = readType(in);
--    switch (type) {
--    case CSV_RECORD:
--      readCsv(in);
--      break;
--    case ARRAY_RECORD:
--      readArray(in);
--      break;
--    default:
--      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, 
String.valueOf(type));
--    }
--  }
--
--  @Override
--  public void write(DataOutput out) throws IOException {
--    writeType(out, type);
--    switch (type) {
--    case CSV_RECORD:
--      writeCsv(out);
--      break;
--    case ARRAY_RECORD:
--      writeArray(out);
--      break;
--    default:
--      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, 
String.valueOf(type));
--    }
--  }
--
--  private int readType(DataInput in) throws IOException {
--    return WritableUtils.readVInt(in);
--  }
--
--  private void writeType(DataOutput out, int type) throws IOException {
--    WritableUtils.writeVInt(out, type);
--  }
--
--  private void readCsv(DataInput in) throws IOException {
--    content = in.readUTF();
--  }
--
--  private void writeCsv(DataOutput out) throws IOException {
--    out.writeUTF((String)content);
--  }
--
--  private void readArray(DataInput in) throws IOException {
--    // read number of columns
--    int columns = in.readInt();
--    content = new Object[columns];
--    Object[] array = (Object[])content;
--    // read each column
--    for (int i = 0; i < array.length; i++) {
--      int type = readType(in);
--      switch (type) {
--      case FieldTypes.UTF:
--        array[i] = in.readUTF();
--        break;
--
--      case FieldTypes.BIN:
--        int length = in.readInt();
--        byte[] bytes = new byte[length];
--        in.readFully(bytes);
--        array[i] = bytes;
--        break;
--
--      case FieldTypes.DOUBLE:
--        array[i] = in.readDouble();
--        break;
--
--      case FieldTypes.FLOAT:
--        array[i] = in.readFloat();
--        break;
--
--      case FieldTypes.LONG:
--        array[i] = in.readLong();
--        break;
--
--      case FieldTypes.INT:
--        array[i] = in.readInt();
--        break;
--
--      case FieldTypes.SHORT:
--        array[i] = in.readShort();
--        break;
--
--      case FieldTypes.CHAR:
--        array[i] = in.readChar();
--        break;
--
--      case FieldTypes.BYTE:
--        array[i] = in.readByte();
--        break;
--
--      case FieldTypes.BOOLEAN:
--        array[i] = in.readBoolean();
--        break;
--
--      case FieldTypes.NULL:
--        array[i] = null;
--        break;
--
--      default:
--        throw new IOException(
--          new SqoopException(MRExecutionError.MAPRED_EXEC_0012, 
Integer.toString(type))
--        );
--      }
--    }
--  }
--
--  private void writeArray(DataOutput out) throws IOException {
--    Object[] array = (Object[])content;
--    // write number of columns
--    out.writeInt(array.length);
--    // write each column
--    for (int i = 0; i < array.length; i++) {
--      if (array[i] instanceof String) {
--        writeType(out, FieldTypes.UTF);
--        out.writeUTF((String)array[i]);
--
--      } else if (array[i] instanceof byte[]) {
--        writeType(out, FieldTypes.BIN);
--        out.writeInt(((byte[])array[i]).length);
--        out.write((byte[])array[i]);
--
--      } else if (array[i] instanceof Double) {
--        writeType(out, FieldTypes.DOUBLE);
--        out.writeDouble((Double)array[i]);
--
--      } else if (array[i] instanceof Float) {
--        writeType(out, FieldTypes.FLOAT);
--        out.writeFloat((Float)array[i]);
--
--      } else if (array[i] instanceof Long) {
--        writeType(out, FieldTypes.LONG);
--        out.writeLong((Long)array[i]);
--
--      } else if (array[i] instanceof Integer) {
--        writeType(out, FieldTypes.INT);
--        out.writeInt((Integer)array[i]);
--
--      } else if (array[i] instanceof Short) {
--        writeType(out, FieldTypes.SHORT);
--        out.writeShort((Short)array[i]);
--
--      } else if (array[i] instanceof Character) {
--        writeType(out, FieldTypes.CHAR);
--        out.writeChar((Character)array[i]);
--
--      } else if (array[i] instanceof Byte) {
--        writeType(out, FieldTypes.BYTE);
--        out.writeByte((Byte)array[i]);
--
--      } else if (array[i] instanceof Boolean) {
--        writeType(out, FieldTypes.BOOLEAN);
--        out.writeBoolean((Boolean)array[i]);
--
--      } else if (array[i] == null) {
--        writeType(out, FieldTypes.NULL);
--
--      } else {
--        throw new IOException(
--          new SqoopException(MRExecutionError.MAPRED_EXEC_0012,
--            array[i].getClass().getName()
--          )
--        );
--      }
--    }
--  }
--
--  private String format() {
--    switch (type) {
--    case EMPTY_DATA:
--      return null;
--
--    case CSV_RECORD:
--      if (fieldDelimiter == DEFAULT_FIELD_DELIMITER) {
--        return (String)content;
--      } else {
--        // TODO: need to exclude the case where comma is part of a string.
--        return ((String)content).replaceAll(
--            String.valueOf(DEFAULT_FIELD_DELIMITER),
--            String.valueOf(fieldDelimiter));
--      }
--
--    case ARRAY_RECORD:
--      StringBuilder sb = new StringBuilder();
--      Object[] array = (Object[])content;
--      for (int i = 0; i < array.length; i++) {
--        if (i != 0) {
--          sb.append(fieldDelimiter);
--        }
--
--        if (array[i] instanceof String) {
--          sb.append(stringDelimiter);
--          sb.append(escape((String)array[i]));
--          sb.append(stringDelimiter);
--        } else if (array[i] instanceof byte[]) {
--          sb.append(Arrays.toString((byte[])array[i]));
--        } else {
--          sb.append(String.valueOf(array[i]));
--        }
--      }
--      return sb.toString();
--
--    default:
--      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, 
String.valueOf(type));
--    }
--  }
--
--  private Object[] parse() {
--    switch (type) {
--    case EMPTY_DATA:
--      return null;
--
--    case CSV_RECORD:
--      ArrayList<Object> list = new ArrayList<Object>();
--      char[] record = ((String)content).toCharArray();
--      int start = 0;
--      int position = start;
--      boolean stringDelimited = false;
--      boolean arrayDelimited = false;
--      int index = 0;
--      while (position < record.length) {
--        if (record[position] == fieldDelimiter) {
--          if (!stringDelimited && !arrayDelimited) {
--            index = parseField(list, record, start, position, index);
--            start = position + 1;
--          }
--        } else if (record[position] == stringDelimiter) {
--          if (!stringDelimited) {
--            stringDelimited = true;
--          }
--          else if (position > 0 && record[position-1] != stringEscape) {
--            stringDelimited = false;
--          }
--        } else if (record[position] == '[') {
--          if (!stringDelimited) {
--            arrayDelimited = true;
--          }
--        } else if (record[position] == ']') {
--          if (!stringDelimited) {
--            arrayDelimited = false;
--          }
--        }
--        position++;
--      }
--      parseField(list, record, start, position, index);
--      return list.toArray();
--
--    case ARRAY_RECORD:
--      return (Object[])content;
--
--    default:
--      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, 
String.valueOf(type));
--    }
--  }
--
--  private int parseField(ArrayList<Object> list, char[] record,
--      int start, int end, int index) {
--    String field = String.valueOf(record, start, end-start).trim();
--
--    int fieldType;
--    if (fieldTypes == null) {
--      fieldType = guessType(field);
--    } else {
--      fieldType = fieldTypes[index];
--    }
--
--    switch (fieldType) {
--    case FieldTypes.UTF:
--      if (field.charAt(0) != stringDelimiter ||
--          field.charAt(field.length()-1) != stringDelimiter) {
--        throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022);
--      }
--      list.add(index, unescape(field.substring(1, field.length()-1)));
--      break;
--
--    case FieldTypes.BIN:
--      if (field.charAt(0) != '[' ||
--          field.charAt(field.length()-1) != ']') {
--        throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022);
--      }
--      String[] splits =
--          field.substring(1, field.length()-1).split(String.valueOf(','));
--      byte[] bytes = new byte[splits.length];
--      for (int i=0; i<bytes.length; i++) {
--        bytes[i] = Byte.parseByte(splits[i].trim());
--      }
--      list.add(index, bytes);
--      break;
--
--    case FieldTypes.DOUBLE:
--      list.add(index, Double.parseDouble(field));
--      break;
--
--    case FieldTypes.FLOAT:
--      list.add(index, Float.parseFloat(field));
--      break;
--
--    case FieldTypes.LONG:
--      list.add(index, Long.parseLong(field));
--      break;
--
--    case FieldTypes.INT:
--      list.add(index, Integer.parseInt(field));
--      break;
--
--    case FieldTypes.SHORT:
--      list.add(index, Short.parseShort(field));
--      break;
--
--    case FieldTypes.CHAR:
--      list.add(index, Character.valueOf(field.charAt(0)));
--      break;
--
--    case FieldTypes.BYTE:
--      list.add(index, Byte.parseByte(field));
--      break;
--
--    case FieldTypes.BOOLEAN:
--      list.add(index, Boolean.parseBoolean(field));
--      break;
--
--    case FieldTypes.NULL:
--      list.add(index, null);
--      break;
--
--    default:
--      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, 
String.valueOf(fieldType));
--    }
--
--    return ++index;
--  }
--
--  private int guessType(String field) {
--    char[] value = field.toCharArray();
--
--    if (value[0] == stringDelimiter) {
--      return FieldTypes.UTF;
--    }
--
--    switch (value[0]) {
--    case 'n':
--    case 'N':
--      return FieldTypes.NULL;
--    case '[':
--      return FieldTypes.BIN;
--    case 't':
--    case 'f':
--    case 'T':
--    case 'F':
--      return FieldTypes.BOOLEAN;
--    }
--
--    int position = 1;
--    while (position < value.length) {
--      switch (value[position++]) {
--      case '.':
--        return FieldTypes.DOUBLE;
--      }
--    }
--
--    return FieldTypes.LONG;
--  }
--
--  private String escape(String string) {
--    // TODO: Also need to escape those special characters as documented in:
--    // 
https://cwiki.apache.org/confluence/display/SQOOP/Sqoop2+Intermediate+representation#Sqoop2Intermediaterepresentation-Intermediateformatrepresentationproposal
--    String regex = String.valueOf(stringDelimiter);
--    String replacement = Matcher.quoteReplacement(escapedStringDelimiter);
--    return string.replaceAll(regex, replacement);
--  }
--
--  private String unescape(String string) {
--    // TODO: Also need to unescape those special characters as documented in:
--    // 
https://cwiki.apache.org/confluence/display/SQOOP/Sqoop2+Intermediate+representation#Sqoop2Intermediaterepresentation-Intermediateformatrepresentationproposal
--    String regex = Matcher.quoteReplacement(escapedStringDelimiter);
--    String replacement = String.valueOf(stringDelimiter);
--    return string.replaceAll(regex, replacement);
--  }
--}
-\ No newline at end of file
-diff --git 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
-deleted file mode 100644
-index dafdeb4..0000000
---- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
-+++ /dev/null
-@@ -1,93 +0,0 @@
--/**
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.sqoop.job;
--
--import java.io.IOException;
--
--import org.apache.hadoop.conf.Configuration;
--import org.apache.hadoop.io.NullWritable;
--import org.apache.hadoop.mapreduce.InputFormat;
--import org.apache.hadoop.mapreduce.Job;
--import org.apache.hadoop.mapreduce.JobContext;
--import org.apache.hadoop.mapreduce.Mapper;
--import org.apache.hadoop.mapreduce.OutputCommitter;
--import org.apache.hadoop.mapreduce.OutputFormat;
--import org.apache.sqoop.job.io.SqoopWritable;
--import org.apache.sqoop.job.mr.SqoopSplit;
--import org.apache.sqoop.utils.ClassUtils;
--
--import static org.mockito.Mockito.mock;
--import static org.mockito.Mockito.when;
--
--public class JobUtils {
--
--  public static boolean runJob(Configuration conf,
--    Class<? extends InputFormat<SqoopSplit, NullWritable>> input,
--    Class<? extends Mapper<SqoopSplit, NullWritable, SqoopWritable, 
NullWritable>> mapper,
--    Class<? extends OutputFormat<SqoopWritable, NullWritable>> output)
--    throws IOException, InterruptedException, ClassNotFoundException {
--    Job job = new Job(conf);
--    job.setInputFormatClass(input);
--    job.setMapperClass(mapper);
--    job.setMapOutputKeyClass(SqoopWritable.class);
--    job.setMapOutputValueClass(NullWritable.class);
--    job.setOutputFormatClass(output);
--    job.setOutputKeyClass(SqoopWritable.class);
--    job.setOutputValueClass(NullWritable.class);
--
--    boolean ret = job.waitForCompletion(true);
--
--    // Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called 
in LocalJobRuner
--    if (isHadoop1()) {
--      callOutputCommitter(job, output);
--    }
--
--    return ret;
--  }
--
--  /**
--   * Call output format on given job manually.
--   */
--  private static void callOutputCommitter(Job job, Class<? extends 
OutputFormat<SqoopWritable, NullWritable>> outputFormat) throws IOException, 
InterruptedException {
--    OutputCommitter committer = 
((OutputFormat)ClassUtils.instantiate(outputFormat)).getOutputCommitter(null);
--
--    JobContext jobContext = mock(JobContext.class);
--    when(jobContext.getConfiguration()).thenReturn(job.getConfiguration());
--
--    committer.commitJob(jobContext);
--  }
--
--  /**
--   * Detect Hadoop 1.0 installation
--   *
--   * @return True if and only if this is Hadoop 1 and below
--   */
--  public static boolean isHadoop1() {
--    String version = org.apache.hadoop.util.VersionInfo.getVersion();
--    if (version.matches("\\b0\\.20\\..+\\b")
--      || version.matches("\\b1\\.\\d\\.\\d")) {
--      return true;
--    }
--    return false;
--  }
--
--  private JobUtils() {
--    // Disable explicit object creation
--  }
--
--}
-diff --git 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
-index 78ae4ec..bbac7d2 100644
---- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
-+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
-@@ -37,6 +37,7 @@
- import org.apache.sqoop.common.Direction;
- import org.apache.sqoop.connector.common.EmptyConfiguration;
- import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
-+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
- import org.apache.sqoop.job.etl.Destroyer;
- import org.apache.sqoop.job.etl.DestroyerContext;
- import org.apache.sqoop.job.etl.Extractor;
-@@ -46,17 +47,13 @@
- import org.apache.sqoop.job.etl.Partition;
- import org.apache.sqoop.job.etl.Partitioner;
- import org.apache.sqoop.job.etl.PartitionerContext;
--import org.apache.sqoop.job.io.Data;
- import org.apache.sqoop.job.io.SqoopWritable;
- import org.apache.sqoop.job.mr.MRConfigurationUtils;
- import org.apache.sqoop.job.mr.SqoopInputFormat;
- import org.apache.sqoop.job.mr.SqoopMapper;
- import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
- import org.apache.sqoop.job.mr.SqoopSplit;
--import org.apache.sqoop.schema.Schema;
--import org.apache.sqoop.schema.type.FixedPoint;
--import org.apache.sqoop.schema.type.FloatingPoint;
--import org.apache.sqoop.schema.type.Text;
-+import org.apache.sqoop.job.util.MRJobTestUtil;
- import org.junit.Assert;
- import org.junit.Test;
- 
-@@ -67,11 +64,10 @@
-   private static final int NUMBER_OF_ROWS_PER_PARTITION = 10;
- 
-   @Test
--  public void testInputFormat() throws Exception {
-+  public void testSqoopInputFormat() throws Exception {
-     Configuration conf = new Configuration();
-     conf.set(MRJobConstants.JOB_ETL_PARTITIONER, 
DummyPartitioner.class.getName());
--    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
--      CSVIntermediateDataFormat.class.getName());
-+    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, 
CSVIntermediateDataFormat.class.getName());
-     Job job = new Job(conf);
- 
-     SqoopInputFormat inputformat = new SqoopInputFormat();
-@@ -79,51 +75,47 @@ public void testInputFormat() throws Exception {
-     assertEquals(9, splits.size());
- 
-     for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
--      SqoopSplit split = (SqoopSplit)splits.get(id-1);
--      DummyPartition partition = (DummyPartition)split.getPartition();
-+      SqoopSplit split = (SqoopSplit) splits.get(id - 1);
-+      DummyPartition partition = (DummyPartition) split.getPartition();
-       assertEquals(id, partition.getId());
-     }
-   }
- 
-   @Test
--  public void testMapper() throws Exception {
-+  public void testSqoopMapper() throws Exception {
-     Configuration conf = new Configuration();
-     conf.set(MRJobConstants.JOB_ETL_PARTITIONER, 
DummyPartitioner.class.getName());
-     conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, 
DummyExtractor.class.getName());
--    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
--      CSVIntermediateDataFormat.class.getName());
--    Schema schema = new Schema("Test");
--    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
--      .addColumn(new org.apache.sqoop.schema.type.Text("3"));
--
-+    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, 
CSVIntermediateDataFormat.class.getName());
-     Job job = new Job(conf);
--    MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
--    MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
--    boolean success = JobUtils.runJob(job.getConfiguration(),
--        SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class);
-+    // from and to have the same schema in this test case
-+    MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, 
MRJobTestUtil.getTestSchema());
-+    MRConfigurationUtils.setConnectorSchema(Direction.TO, job, 
MRJobTestUtil.getTestSchema());
-+    boolean success = MRJobTestUtil.runJob(job.getConfiguration(),
-+                                      SqoopInputFormat.class,
-+                                      SqoopMapper.class,
-+                                      DummyOutputFormat.class);
-     Assert.assertEquals("Job failed!", true, success);
-   }
- 
-   @Test
--  public void testOutputFormat() throws Exception {
-+  public void testNullOutputFormat() throws Exception {
-     Configuration conf = new Configuration();
-     conf.set(MRJobConstants.JOB_ETL_PARTITIONER, 
DummyPartitioner.class.getName());
-     conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, 
DummyExtractor.class.getName());
-     conf.set(MRJobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
-     conf.set(MRJobConstants.JOB_ETL_FROM_DESTROYER, 
DummyFromDestroyer.class.getName());
-     conf.set(MRJobConstants.JOB_ETL_TO_DESTROYER, 
DummyToDestroyer.class.getName());
--    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
--      CSVIntermediateDataFormat.class.getName());
--    Schema schema = new Schema("Test");
--    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
--      .addColumn(new Text("3"));
-+    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, 
CSVIntermediateDataFormat.class.getName());
- 
-     Job job = new Job(conf);
--    MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
--    MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
--    boolean success = JobUtils.runJob(job.getConfiguration(),
--        SqoopInputFormat.class, SqoopMapper.class,
--        SqoopNullOutputFormat.class);
-+    // from and to have the same schema in this test case
-+    MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, 
MRJobTestUtil.getTestSchema());
-+    MRConfigurationUtils.setConnectorSchema(Direction.TO, job, 
MRJobTestUtil.getTestSchema());
-+    boolean success = MRJobTestUtil.runJob(job.getConfiguration(),
-+                                     SqoopInputFormat.class,
-+                                     SqoopMapper.class,
-+                                     SqoopNullOutputFormat.class);
-     Assert.assertEquals("Job failed!", true, success);
- 
-     // Make sure both destroyers get called.
-@@ -171,15 +163,17 @@ public String toString() {
-     }
-   }
- 
--  public static class DummyExtractor extends Extractor<EmptyConfiguration, 
EmptyConfiguration, DummyPartition> {
-+  public static class DummyExtractor extends
-+      Extractor<EmptyConfiguration, EmptyConfiguration, DummyPartition> {
-     @Override
--    public void extract(ExtractorContext context, EmptyConfiguration oc, 
EmptyConfiguration oj, DummyPartition partition) {
--      int id = ((DummyPartition)partition).getId();
-+    public void extract(ExtractorContext context, EmptyConfiguration oc, 
EmptyConfiguration oj,
-+        DummyPartition partition) {
-+      int id = ((DummyPartition) partition).getId();
-       for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
--        context.getDataWriter().writeArrayRecord(new Object[] {
--            id * NUMBER_OF_ROWS_PER_PARTITION + row,
--            (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row),
--            String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
-+        context.getDataWriter().writeArrayRecord(
-+            new Object[] { id * NUMBER_OF_ROWS_PER_PARTITION + row,
-+                (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row),
-+                String.valueOf(id * NUMBER_OF_ROWS_PER_PARTITION + row) });
-       }
-     }
- 
-@@ -189,16 +183,14 @@ public long getRowsRead() {
-     }
-   }
- 
--  public static class DummyOutputFormat
--      extends OutputFormat<SqoopWritable, NullWritable> {
-+  public static class DummyOutputFormat extends OutputFormat<SqoopWritable, 
NullWritable> {
-     @Override
-     public void checkOutputSpecs(JobContext context) {
-       // do nothing
-     }
- 
-     @Override
--    public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(
--        TaskAttemptContext context) {
-+    public RecordWriter<SqoopWritable, NullWritable> 
getRecordWriter(TaskAttemptContext context) {
-       return new DummyRecordWriter();
-     }
- 
-@@ -207,22 +199,17 @@ public OutputCommitter 
getOutputCommitter(TaskAttemptContext context) {
-       return new DummyOutputCommitter();
-     }
- 
--    public static class DummyRecordWriter
--        extends RecordWriter<SqoopWritable, NullWritable> {
--      private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
--      private Data data = new Data();
-+    public static class DummyRecordWriter extends RecordWriter<SqoopWritable, 
NullWritable> {
-+      private int index = START_PARTITION * NUMBER_OF_ROWS_PER_PARTITION;
-+      // should I use a dummy IDF for testing?
-+      private IntermediateDataFormat<?> dataFormat = 
MRJobTestUtil.getTestIDF();
- 
-       @Override
-       public void write(SqoopWritable key, NullWritable value) {
--
--        data.setContent(new Object[] {
--          index,
--          (double) index,
--          String.valueOf(index)},
--          Data.ARRAY_RECORD);
-+        String testData = "" + index + "," +  (double) index + ",'" + 
String.valueOf(index) + "'";
-+        dataFormat.setTextData(testData);
-         index++;
--
--        assertEquals(data.toString(), key.toString());
-+        assertEquals(dataFormat.getTextData().toString(), key.toString());
-       }
- 
-       @Override
-@@ -233,16 +220,20 @@ public void close(TaskAttemptContext context) {
- 
-     public static class DummyOutputCommitter extends OutputCommitter {
-       @Override
--      public void setupJob(JobContext jobContext) { }
-+      public void setupJob(JobContext jobContext) {
-+      }
- 
-       @Override
--      public void setupTask(TaskAttemptContext taskContext) { }
-+      public void setupTask(TaskAttemptContext taskContext) {
-+      }
- 
-       @Override
--      public void commitTask(TaskAttemptContext taskContext) { }
-+      public void commitTask(TaskAttemptContext taskContext) {
-+      }
- 
-       @Override
--      public void abortTask(TaskAttemptContext taskContext) { }
-+      public void abortTask(TaskAttemptContext taskContext) {
-+      }
- 
-       @Override
-       public boolean needsTaskCommit(TaskAttemptContext taskContext) {
-@@ -251,39 +242,34 @@ public boolean needsTaskCommit(TaskAttemptContext 
taskContext) {
-     }
-   }
- 
-+  // it is writing to the target.
-   public static class DummyLoader extends Loader<EmptyConfiguration, 
EmptyConfiguration> {
--    private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
--    private Data expected = new Data();
-+    private int index = START_PARTITION * NUMBER_OF_ROWS_PER_PARTITION;
-+    private IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
- 
-     @Override
--    public void load(LoaderContext context, EmptyConfiguration oc, 
EmptyConfiguration oj) throws Exception{
-+    public void load(LoaderContext context, EmptyConfiguration oc, 
EmptyConfiguration oj)
-+        throws Exception {
-       String data;
-       while ((data = context.getDataReader().readTextRecord()) != null) {
--        expected.setContent(new Object[] {
--          index,
--          (double) index,
--          String.valueOf(index)},
--          Data.ARRAY_RECORD);
-+        String testData = "" + index + "," +  (double) index + ",'" + 
String.valueOf(index) + "'";
-+        dataFormat.setTextData(testData);
-         index++;
--        assertEquals(expected.toString(), data);
-+        assertEquals(dataFormat.getTextData().toString(), data);
-       }
-     }
-   }
- 
-   public static class DummyFromDestroyer extends 
Destroyer<EmptyConfiguration, EmptyConfiguration> {
--
-     public static int count = 0;
--
-     @Override
-     public void destroy(DestroyerContext context, EmptyConfiguration o, 
EmptyConfiguration o2) {
-       count++;
-     }
-   }
- 
--  public static class DummyToDestroyer extends 
Destroyer<EmptyConfiguration,EmptyConfiguration> {
--
-+  public static class DummyToDestroyer extends Destroyer<EmptyConfiguration, 
EmptyConfiguration> {
-     public static int count = 0;
--
-     @Override
-     public void destroy(DestroyerContext context, EmptyConfiguration o, 
EmptyConfiguration o2) {
-       count++;
-diff --git 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
-index 04fb692..a64a4a6 100644
---- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
-+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
-@@ -38,16 +38,17 @@
- import org.apache.sqoop.common.Direction;
- import org.apache.sqoop.connector.common.EmptyConfiguration;
- import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
-+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
- import org.apache.sqoop.job.etl.Extractor;
- import org.apache.sqoop.job.etl.ExtractorContext;
- import org.apache.sqoop.job.etl.Partition;
- import org.apache.sqoop.job.etl.Partitioner;
- import org.apache.sqoop.job.etl.PartitionerContext;
--import org.apache.sqoop.job.io.Data;
- import org.apache.sqoop.job.io.SqoopWritable;
- import org.apache.sqoop.job.mr.MRConfigurationUtils;
- import org.apache.sqoop.job.mr.SqoopInputFormat;
- import org.apache.sqoop.job.mr.SqoopMapper;
-+import org.apache.sqoop.job.util.MRJobTestUtil;
- import org.apache.sqoop.schema.Schema;
- import org.apache.sqoop.schema.type.FixedPoint;
- import org.apache.sqoop.schema.type.FloatingPoint;
-@@ -121,6 +122,7 @@ public TestMatching(Schema from,
-     return parameters;
-   }
- 
-+  @SuppressWarnings("deprecation")
-   @Test
-   public void testSchemaMatching() throws Exception {
-     Configuration conf = new Configuration();
-@@ -132,9 +134,9 @@ public void testSchemaMatching() throws Exception {
-     Job job = new Job(conf);
-     MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, from);
-     MRConfigurationUtils.setConnectorSchema(Direction.TO, job, to);
--    JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, 
SqoopMapper.class,
-+    MRJobTestUtil.runJob(job.getConfiguration(), SqoopInputFormat.class, 
SqoopMapper.class,
-         DummyOutputFormat.class);
--    boolean success = JobUtils.runJob(job.getConfiguration(),
-+    boolean success = MRJobTestUtil.runJob(job.getConfiguration(),
-         SqoopInputFormat.class, SqoopMapper.class,
-         DummyOutputFormat.class);
-     if (from.getName().split("-")[1].equals("EMPTY")) {
-@@ -233,19 +235,14 @@ public OutputCommitter 
getOutputCommitter(TaskAttemptContext context) {
-     public static class DummyRecordWriter
-         extends RecordWriter<SqoopWritable, NullWritable> {
-       private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
--      private Data data = new Data();
-+      private IntermediateDataFormat<?> dataFormat = 
MRJobTestUtil.getTestIDF();
- 
-       @Override
-       public void write(SqoopWritable key, NullWritable value) {
--
--        data.setContent(new Object[] {
--                index,
--                (double) index,
--                String.valueOf(index)},
--            Data.ARRAY_RECORD);
-+        String testData = "" + index + "," +  (double) index + ",'" + 
String.valueOf(index) + "'";
-+        dataFormat.setTextData(testData);
-         index++;
--
--        assertEquals(data.toString(), key.toString());
-+        assertEquals(dataFormat.getTextData().toString(), key.toString());
-       }
- 
-       @Override
-diff --git 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
-deleted file mode 100644
-index 68ce5ed..0000000
---- 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
-+++ /dev/null
-@@ -1,95 +0,0 @@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing,
-- * software distributed under the License is distributed on an
-- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- * KIND, either express or implied.  See the License for the
-- * specific language governing permissions and limitations
-- * under the License.
-- */
--package org.apache.sqoop.job.io;
--
--import com.google.common.base.Charsets;
--
--import java.io.ByteArrayInputStream;
--import java.io.ByteArrayOutputStream;
--import java.io.DataInput;
--import java.io.DataInputStream;
--import java.io.DataOutput;
--import java.io.DataOutputStream;
--import java.io.IOException;
--import java.io.InputStream;
--
--import org.apache.hadoop.conf.Configuration;
--import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
--import org.apache.sqoop.job.MRJobConstants;
--import org.junit.Assert;
--import org.junit.Test;
--
--public class SqoopWritableTest {
--
--  private final SqoopWritable writable = new SqoopWritable();
--
--  @Test
--  public void testStringInStringOut() {
--    String testData = "Live Long and prosper";
--    writable.setString(testData);
--    Assert.assertEquals(testData,writable.getString());
--  }
--
--  @Test
--  public void testDataWritten() throws IOException {
--    String testData = "One ring to rule them all";
--    byte[] testDataBytes = testData.getBytes(Charsets.UTF_8);
--    writable.setString(testData);
--    ByteArrayOutputStream ostream = new ByteArrayOutputStream();
--    DataOutput out = new DataOutputStream(ostream);
--    writable.write(out);
--    byte[] written = ostream.toByteArray();
--    InputStream instream = new ByteArrayInputStream(written);
--    DataInput in = new DataInputStream(instream);
--    String readData = in.readUTF();
--    Assert.assertEquals(testData, readData);
--  }
--
--  @Test
--  public void testDataRead() throws IOException {
--    String testData = "Brandywine Bridge - 20 miles!";
--    ByteArrayOutputStream ostream = new ByteArrayOutputStream();
--    DataOutput out = new DataOutputStream(ostream);
--    out.writeUTF(testData);
--    InputStream instream = new ByteArrayInputStream(ostream.toByteArray());
--    DataInput in = new DataInputStream(instream);
--    writable.readFields(in);
--    Assert.assertEquals(testData, writable.getString());
--  }
--
--  @Test
--  public void testWriteReadUsingStream() throws IOException {
--    String testData = "You shall not pass";
--    ByteArrayOutputStream ostream = new ByteArrayOutputStream();
--    DataOutput out = new DataOutputStream(ostream);
--    writable.setString(testData);
--    writable.write(out);
--    byte[] written = ostream.toByteArray();
--
--    //Don't test what the data is, test that SqoopWritable can read it.
--    InputStream instream = new ByteArrayInputStream(written);
--    SqoopWritable newWritable = new SqoopWritable();
--    DataInput in = new DataInputStream(instream);
--    newWritable.readFields(in);
--    Assert.assertEquals(testData, newWritable.getString());
--    ostream.close();
--    instream.close();
--  }
--
--}
-diff --git 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java
-deleted file mode 100644
-index 4e23bcb..0000000
---- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java
-+++ /dev/null
-@@ -1,117 +0,0 @@
--/**
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.sqoop.job.io;
--
--import java.util.Arrays;
--
--import org.junit.Assert;
--import org.junit.Test;
--
--public class TestData {
--
--  private static final double TEST_NUMBER = Math.PI + 100;
--  @Test
--  public void testArrayToCsv() throws Exception {
--    Data data = new Data();
--    String expected;
--    String actual;
--
--    // with special characters:
--    expected =
--        Long.valueOf((long)TEST_NUMBER) + "," +
--        Double.valueOf(TEST_NUMBER) + "," +
--        "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," +
--        Arrays.toString(new byte[] {1, 2, 3, 4, 5});
--    data.setContent(new Object[] {
--        Long.valueOf((long)TEST_NUMBER),
--        Double.valueOf(TEST_NUMBER),
--        String.valueOf(TEST_NUMBER) + "',s",
--        new byte[] {1, 2, 3, 4, 5} },
--        Data.ARRAY_RECORD);
--    actual = (String)data.getContent(Data.CSV_RECORD);
--    assertEquals(expected, actual);
--
--    // with null characters:
--    expected =
--        Long.valueOf((long)TEST_NUMBER) + "," +
--        Double.valueOf(TEST_NUMBER) + "," +
--        "null" + "," +
--        Arrays.toString(new byte[] {1, 2, 3, 4, 5});
--    data.setContent(new Object[] {
--        Long.valueOf((long)TEST_NUMBER),
--        Double.valueOf(TEST_NUMBER),
--        null,
--        new byte[] {1, 2, 3, 4, 5} },
--        Data.ARRAY_RECORD);
--    actual = (String)data.getContent(Data.CSV_RECORD);
--    assertEquals(expected, actual);
--  }
--
--  @Test
--  public void testCsvToArray() throws Exception {
--    Data data = new Data();
--    Object[] expected;
--    Object[] actual;
--
--    // with special characters:
--    expected = new Object[] {
--        Long.valueOf((long)TEST_NUMBER),
--        Double.valueOf(TEST_NUMBER),
--        String.valueOf(TEST_NUMBER) + "',s",
--        new byte[] {1, 2, 3, 4, 5} };
--    data.setContent(
--        Long.valueOf((long)TEST_NUMBER) + "," +
--        Double.valueOf(TEST_NUMBER) + "," +
--        "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," +
--        Arrays.toString(new byte[] {1, 2, 3, 4, 5}),
--        Data.CSV_RECORD);
--    actual = (Object[])data.getContent(Data.ARRAY_RECORD);
--    assertEquals(expected.length, actual.length);
--    for (int c=0; c<expected.length; c++) {
--      assertEquals(expected[c], actual[c]);
--    }
--
--    // with null characters:
--    expected = new Object[] {
--        Long.valueOf((long)TEST_NUMBER),
--        Double.valueOf(TEST_NUMBER),
--        null,
--        new byte[] {1, 2, 3, 4, 5} };
--    data.setContent(
--        Long.valueOf((long)TEST_NUMBER) + "," +
--        Double.valueOf(TEST_NUMBER) + "," +
--        "null" + "," +
--        Arrays.toString(new byte[] {1, 2, 3, 4, 5}),
--        Data.CSV_RECORD);
--    actual = (Object[])data.getContent(Data.ARRAY_RECORD);
--    assertEquals(expected.length, actual.length);
--    for (int c=0; c<expected.length; c++) {
--      assertEquals(expected[c], actual[c]);
--    }
--  }
--
--  public static void assertEquals(Object expected, Object actual) {
--    if (expected instanceof byte[]) {
--      Assert.assertEquals(Arrays.toString((byte[])expected),
--          Arrays.toString((byte[])actual));
--    } else {
--      Assert.assertEquals(expected, actual);
--    }
--  }
--
--}
-diff --git 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java
 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java
-new file mode 100644
-index 0000000..3207e53
---- /dev/null
-+++ 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java
-@@ -0,0 +1,89 @@
-+/*
-+ * Licensed to the Apache Software Foundation (ASF) under one
-+ * or more contributor license agreements.  See the NOTICE file
-+ * distributed with this work for additional information
-+ * regarding copyright ownership.  The ASF licenses this file
-+ * to you under the Apache License, Version 2.0 (the
-+ * "License"); you may not use this file except in compliance
-+ * with the License.  You may obtain a copy of the License at
-+ *
-+ * http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing,
-+ * software distributed under the License is distributed on an
-+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-+ * KIND, either express or implied.  See the License for the
-+ * specific language governing permissions and limitations
-+ * under the License.
-+ */
-+package org.apache.sqoop.job.io;
-+
-+import java.io.ByteArrayInputStream;
-+import java.io.ByteArrayOutputStream;
-+import java.io.DataInput;
-+import java.io.DataInputStream;
-+import java.io.DataOutput;
-+import java.io.DataOutputStream;
-+import java.io.IOException;
-+import java.io.InputStream;
-+
-+import org.junit.Assert;
-+import org.junit.Test;
-+
-+public class TestSqoopWritable {
-+
-+  private final SqoopWritable writable = new SqoopWritable();
-+
-+  @Test
-+  public void testStringInStringOut() {
-+    String testData = "Live Long and prosper";
-+    writable.setString(testData);
-+    Assert.assertEquals(testData,writable.getString());
-+  }
-+
-+  @Test
-+  public void testDataWritten() throws IOException {
-+    String testData = "One ring to rule them all";
-+    writable.setString(testData);
-+    ByteArrayOutputStream ostream = new ByteArrayOutputStream();
-+    DataOutput out = new DataOutputStream(ostream);
-+    writable.write(out);
-+    byte[] written = ostream.toByteArray();
-+    InputStream instream = new ByteArrayInputStream(written);
-+    DataInput in = new DataInputStream(instream);
-+    String readData = in.readUTF();
-+    Assert.assertEquals(testData, readData);
-+  }
-+
-+  @Test
-+  public void testDataRead() throws IOException {
-+    String testData = "Brandywine Bridge - 20 miles!";
-+    ByteArrayOutputStream ostream = new ByteArrayOutputStream();
-+    DataOutput out = new DataOutputStream(ostream);
-+    out.writeUTF(testData);
-+    InputStream instream = new ByteArrayInputStream(ostream.toByteArray());
-+    DataInput in = new DataInputStream(instream);
-+    writable.readFields(in);
-+    Assert.assertEquals(testData, writable.getString());
-+  }
-+
-+  @Test
-+  public void testWriteReadUsingStream() throws IOException {
-+    String testData = "You shall not pass";
-+    ByteArrayOutputStream ostream = new ByteArrayOutputStream();
-+    DataOutput out = new DataOutputStream(ostream);
-+    writable.setString(testData);
-+    writable.write(out);
-+    byte[] written = ostream.toByteArray();
-+
-+    //Don't test what the data is, test that SqoopWritable can read it.
-+    InputStream instream = new ByteArrayInputStream(written);
-+    SqoopWritable newWritable = new SqoopWritable();
-+    DataInput in = new DataInputStream(instream);
-+    newWritable.readFields(in);
-+    Assert.assertEquals(testData, newWritable.getString());
-+    ostream.close();
-+    instream.close();
-+  }
-+
-+}
-diff --git 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
-index 5bd11f0..67e965d 100644
---- 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
-+++ 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
-@@ -18,6 +18,10 @@
-  */
- package org.apache.sqoop.job.mr;
- 
-+import java.util.ConcurrentModificationException;
-+import java.util.concurrent.BrokenBarrierException;
-+import java.util.concurrent.TimeUnit;
-+
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.mapreduce.RecordWriter;
-@@ -28,14 +32,11 @@
- import org.apache.sqoop.job.etl.Loader;
- import org.apache.sqoop.job.etl.LoaderContext;
- import org.apache.sqoop.job.io.SqoopWritable;
-+import org.apache.sqoop.job.util.MRJobTestUtil;
- import org.junit.Assert;
- import org.junit.Before;
- import org.junit.Test;
- 
--import java.util.ConcurrentModificationException;
--import java.util.concurrent.BrokenBarrierException;
--import java.util.concurrent.TimeUnit;
--
- public class TestSqoopOutputFormatLoadExecutor {
- 
-   private Configuration conf;
-@@ -130,12 +131,12 @@ public void testWhenLoaderThrows() throws Throwable {
-     SqoopOutputFormatLoadExecutor executor = new
-         SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName());
-     RecordWriter<SqoopWritable, NullWritable> writer = 
executor.getRecordWriter();
--    IntermediateDataFormat data = new CSVIntermediateDataFormat();
-+    IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
-     SqoopWritable writable = new SqoopWritable();
-     try {
-       for (int count = 0; count < 100; count++) {
--        data.setTextData(String.valueOf(count));
--        writable.setString(data.getTextData());
-+        dataFormat.setTextData(String.valueOf(count));
-+        writable.setString(dataFormat.getTextData());
-         writer.write(writable, null);
-       }
-     } catch (SqoopException ex) {
-@@ -149,7 +150,7 @@ public void testSuccessfulContinuousLoader() throws 
Throwable {
-     SqoopOutputFormatLoadExecutor executor = new
-         SqoopOutputFormatLoadExecutor(true, 
GoodContinuousLoader.class.getName());
-     RecordWriter<SqoopWritable, NullWritable> writer = 
executor.getRecordWriter();
--    IntermediateDataFormat data = new CSVIntermediateDataFormat();
-+    IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
-     SqoopWritable writable = new SqoopWritable();
-     for (int i = 0; i < 10; i++) {
-       StringBuilder builder = new StringBuilder();
-@@ -159,8 +160,8 @@ public void testSuccessfulContinuousLoader() throws 
Throwable {
-           builder.append(",");
-         }
-       }
--      data.setTextData(builder.toString());
--      writable.setString(data.getTextData());
-+      dataFormat.setTextData(builder.toString());
-+      writable.setString(dataFormat.getTextData());
-       writer.write(writable, null);
-     }
-     writer.close(null);
-@@ -171,7 +172,7 @@ public void testSuccessfulLoader() throws Throwable {
-     SqoopOutputFormatLoadExecutor executor = new
-         SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName());
-     RecordWriter<SqoopWritable, NullWritable> writer = 
executor.getRecordWriter();
--    IntermediateDataFormat data = new CSVIntermediateDataFormat();
-+    IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
-     SqoopWritable writable = new SqoopWritable();
-     StringBuilder builder = new StringBuilder();
-     for (int count = 0; count < 100; count++) {
-@@ -180,8 +181,8 @@ public void testSuccessfulLoader() throws Throwable {
-         builder.append(",");
-       }
-     }
--    data.setTextData(builder.toString());
--    writable.setString(data.getTextData());
-+    dataFormat.setTextData(builder.toString());
-+    writable.setString(dataFormat.getTextData());
-     writer.write(writable, null);
- 
-     //Allow writer to complete.
-@@ -196,7 +197,7 @@ public void testThrowingContinuousLoader() throws 
Throwable {
-     SqoopOutputFormatLoadExecutor executor = new
-         SqoopOutputFormatLoadExecutor(true, 
ThrowingContinuousLoader.class.getName());
-     RecordWriter<SqoopWritable, NullWritable> writer = 
executor.getRecordWriter();
--    IntermediateDataFormat data = new CSVIntermediateDataFormat();
-+    IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
-     SqoopWritable writable = new SqoopWritable();
-     try {
-       for (int i = 0; i < 10; i++) {
-@@ -207,8 +208,8 @@ public void testThrowingContinuousLoader() throws 
Throwable {
-             builder.append(",");
-           }
-         }
--        data.setTextData(builder.toString());
--        writable.setString(data.getTextData());
-+        dataFormat.setTextData(builder.toString());
-+        writable.setString(dataFormat.getTextData());
-         writer.write(writable, null);
-       }
-       writer.close(null);
-diff --git 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java
 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java
-new file mode 100644
-index 0000000..5d5359e
---- /dev/null
-+++ 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java
-@@ -0,0 +1,114 @@
-+/**
-+ * Licensed to the Apache Software Foundation (ASF) under one
-+ * or more contributor license agreements.  See the NOTICE file
-+ * distributed with this work for additional information
-+ * regarding copyright ownership.  The ASF licenses this file
-+ * to you under the Apache License, Version 2.0 (the
-+ * "License"); you may not use this file except in compliance
-+ * with the License.  You may obtain a copy of the License at
-+ *
-+ *     http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ * See the License for the specific language governing permissions and
-+ * limitations under the License.
-+ */
-+package org.apache.sqoop.job.util;
-+
-+import java.io.IOException;
-+
-+import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.io.NullWritable;
-+import org.apache.hadoop.mapreduce.InputFormat;
-+import org.apache.hadoop.mapreduce.Job;
-+import org.apache.hadoop.mapreduce.JobContext;
-+import org.apache.hadoop.mapreduce.Mapper;
-+import org.apache.hadoop.mapreduce.OutputCommitter;
-+import org.apache.hadoop.mapreduce.OutputFormat;
-+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
-+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
-+import org.apache.sqoop.job.io.SqoopWritable;
-+import org.apache.sqoop.job.mr.SqoopSplit;
-+import org.apache.sqoop.schema.Schema;
-+import org.apache.sqoop.schema.type.FixedPoint;
-+import org.apache.sqoop.schema.type.FloatingPoint;
-+import org.apache.sqoop.schema.type.Text;
-+import org.apache.sqoop.utils.ClassUtils;
-+
-+import static org.mockito.Mockito.mock;
-+import static org.mockito.Mockito.when;
-+
-+public class MRJobTestUtil {
-+
-+  @SuppressWarnings("deprecation")
-+  public static boolean runJob(Configuration conf,
-+      Class<? extends InputFormat<SqoopSplit, NullWritable>> input,
-+      Class<? extends Mapper<SqoopSplit, NullWritable, SqoopWritable, 
NullWritable>> mapper,
-+      Class<? extends OutputFormat<SqoopWritable, NullWritable>> output) 
throws IOException,
-+      InterruptedException, ClassNotFoundException {
-+    Job job = new Job(conf);
-+    job.setInputFormatClass(input);
-+    job.setMapperClass(mapper);
-+    job.setMapOutputKeyClass(SqoopWritable.class);
-+    job.setMapOutputValueClass(NullWritable.class);
-+    job.setOutputFormatClass(output);
-+    job.setOutputKeyClass(SqoopWritable.class);
-+    job.setOutputValueClass(NullWritable.class);
-+
-+    boolean ret = job.waitForCompletion(true);
-+
-+    // Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called 
in
-+    // LocalJobRuner
-+    if (isHadoop1()) {
-+      callOutputCommitter(job, output);
-+    }
-+
-+    return ret;
-+  }
-+
-+  public static Schema getTestSchema() {
-+    Schema schema = new Schema("Test");
-+    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
-+        .addColumn(new Text("3"));
-+    return schema;
-+  }
-+
-+  public static IntermediateDataFormat<?> getTestIDF() {
-+    return new CSVIntermediateDataFormat(getTestSchema());
-+  }
-+
-+  /**
-+   * Call output format on given job manually.
-+   */
-+  private static void callOutputCommitter(Job job,
-+      Class<? extends OutputFormat<SqoopWritable, NullWritable>> 
outputFormat) throws IOException,
-+      InterruptedException {
-+    OutputCommitter committer = ((OutputFormat<?,?>) 
ClassUtils.instantiate(outputFormat))
-+        .getOutputCommitter(null);
-+
-+    JobContext jobContext = mock(JobContext.class);
-+    when(jobContext.getConfiguration()).thenReturn(job.getConfiguration());
-+
-+    committer.commitJob(jobContext);
-+  }
-+
-+  /**
-+   * Detect Hadoop 1.0 installation
-+   *
-+   * @return True if and only if this is Hadoop 1 and below
-+   */
-+  public static boolean isHadoop1() {
-+    String version = org.apache.hadoop.util.VersionInfo.getVersion();
-+    if (version.matches("\\b0\\.20\\..+\\b") || 
version.matches("\\b1\\.\\d\\.\\d")) {
-+      return true;
-+    }
-+    return false;
-+  }
-+
-+  private MRJobTestUtil() {
-+    // Disable explicit object creation
-+  }
-+
-+}

Reply via email to