Repository: sqoop
Updated Branches:
  refs/heads/SQOOP-1367 3bb7ff834 -> 2b214cdd2


SQOOP-1455: Sqoop2: From/To: Re-enable MapreduceExecutionEngine tests

(Abraham Elmahrek via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/2b214cdd
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/2b214cdd
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/2b214cdd

Branch: refs/heads/SQOOP-1367
Commit: 2b214cdd27b9634045a3d1608a5b2b0b0f974293
Parents: 3bb7ff8
Author: Jarek Jarcec Cecho <[email protected]>
Authored: Tue Aug 19 19:11:13 2014 -0700
Committer: Jarek Jarcec Cecho <[email protected]>
Committed: Tue Aug 19 19:11:13 2014 -0700

----------------------------------------------------------------------
 .../org/apache/sqoop/job/TestMapReduce.java     | 398 +++++++++----------
 .../java/org/apache/sqoop/job/io/TestData.java  | 178 ++++-----
 .../sqoop/job/mr/TestConfigurationUtils.java    | 272 ++++++-------
 .../mr/TestSqoopOutputFormatLoadExecutor.java   | 364 +++++++++--------
 4 files changed, 601 insertions(+), 611 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/2b214cdd/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index 4219e9e..2dfc487 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -34,7 +34,8 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-//import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
 import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.etl.ExtractorContext;
 import org.apache.sqoop.job.etl.Loader;
@@ -49,7 +50,6 @@ import org.apache.sqoop.job.mr.SqoopInputFormat;
 import org.apache.sqoop.job.mr.SqoopMapper;
 import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
 import org.apache.sqoop.job.mr.SqoopSplit;
-import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.schema.type.FixedPoint;
 import org.apache.sqoop.schema.type.FloatingPoint;
@@ -57,204 +57,198 @@ import org.apache.sqoop.schema.type.Text;
 
 public class TestMapReduce extends TestCase {
 
-//  private static final int START_PARTITION = 1;
-//  private static final int NUMBER_OF_PARTITIONS = 9;
-//  private static final int NUMBER_OF_ROWS_PER_PARTITION = 10;
-//
-//  public void testInputFormat() throws Exception {
-//    Configuration conf = new Configuration();
-//    ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
-//    conf.set(JobConstants.JOB_ETL_PARTITIONER, 
DummyPartitioner.class.getName());
-//    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
-//      CSVIntermediateDataFormat.class.getName());
-//    Job job = new Job(conf);
-//
-//    SqoopInputFormat inputformat = new SqoopInputFormat();
-//    List<InputSplit> splits = inputformat.getSplits(job);
-//    assertEquals(9, splits.size());
-//
-//    for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
-//      SqoopSplit split = (SqoopSplit)splits.get(id-1);
-//      DummyPartition partition = (DummyPartition)split.getPartition();
-//      assertEquals(id, partition.getId());
-//    }
-//  }
-//
-//  public void testMapper() throws Exception {
-//    Configuration conf = new Configuration();
-//    ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
-//    conf.set(JobConstants.JOB_ETL_PARTITIONER, 
DummyPartitioner.class.getName());
-//    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
-//    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
-//      CSVIntermediateDataFormat.class.getName());
-//    Schema schema = new Schema("Test");
-//    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
-//      .addColumn(new org.apache.sqoop.schema.type.Text("3"));
-//
-//    Job job = new Job(conf);
-//    ConfigurationUtils.setConnectorSchema(job, schema);
-//    JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, 
SqoopMapper.class,
-//        DummyOutputFormat.class);
-//  }
-//
-//  public void testOutputFormat() throws Exception {
-//    Configuration conf = new Configuration();
-//    ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
-//    conf.set(JobConstants.JOB_ETL_PARTITIONER, 
DummyPartitioner.class.getName());
-//    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
-//    conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
-//    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
-//      CSVIntermediateDataFormat.class.getName());
-//    Schema schema = new Schema("Test");
-//    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
-//      .addColumn(new Text("3"));
-//
-//    Job job = new Job(conf);
-//    ConfigurationUtils.setConnectorSchema(job, schema);
-//    JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, 
SqoopMapper.class,
-//        SqoopNullOutputFormat.class);
-//  }
-//
-//  public static class DummyPartition extends Partition {
-//    private int id;
-//
-//    public void setId(int id) {
-//      this.id = id;
-//    }
-//
-//    public int getId() {
-//      return id;
-//    }
-//
-//    @Override
-//    public void readFields(DataInput in) throws IOException {
-//      id = in.readInt();
-//    }
-//
-//    @Override
-//    public void write(DataOutput out) throws IOException {
-//      out.writeInt(id);
-//    }
-//
-//    @Override
-//    public String toString() {
-//      return Integer.toString(id);
-//    }
-//  }
-//
-//  public static class DummyPartitioner extends Partitioner {
-//    @Override
-//    public List<Partition> getPartitions(PartitionerContext context, Object 
oc, Object oj) {
-//      List<Partition> partitions = new LinkedList<Partition>();
-//      for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
-//        DummyPartition partition = new DummyPartition();
-//        partition.setId(id);
-//        partitions.add(partition);
-//      }
-//      return partitions;
-//    }
-//  }
-//
-//  public static class DummyExtractor extends Extractor {
-//    @Override
-//    public void extract(ExtractorContext context, Object oc, Object oj, 
Object partition) {
-//      int id = ((DummyPartition)partition).getId();
-//      for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
-//        context.getDataWriter().writeArrayRecord(new Object[] {
-//            id * NUMBER_OF_ROWS_PER_PARTITION + row,
-//            (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row),
-//            String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
-//      }
-//    }
-//
-//    @Override
-//    public long getRowsRead() {
-//      return NUMBER_OF_ROWS_PER_PARTITION;
-//    }
-//  }
-//
-//  public static class DummyOutputFormat
-//      extends OutputFormat<SqoopWritable, NullWritable> {
-//    @Override
-//    public void checkOutputSpecs(JobContext context) {
-//      // do nothing
-//    }
-//
-//    @Override
-//    public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(
-//        TaskAttemptContext context) {
-//      return new DummyRecordWriter();
-//    }
-//
-//    @Override
-//    public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
-//      return new DummyOutputCommitter();
-//    }
-//
-//    public static class DummyRecordWriter
-//        extends RecordWriter<SqoopWritable, NullWritable> {
-//      private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
-//      private Data data = new Data();
-//
-//      @Override
-//      public void write(SqoopWritable key, NullWritable value) {
-//
-//        data.setContent(new Object[] {
-//          index,
-//          (double) index,
-//          String.valueOf(index)},
-//          Data.ARRAY_RECORD);
-//        index++;
-//
-//        assertEquals(data.toString(), key.toString());
-//      }
-//
-//      @Override
-//      public void close(TaskAttemptContext context) {
-//        // do nothing
-//      }
-//    }
-//
-//    public static class DummyOutputCommitter extends OutputCommitter {
-//      @Override
-//      public void setupJob(JobContext jobContext) { }
-//
-//      @Override
-//      public void setupTask(TaskAttemptContext taskContext) { }
-//
-//      @Override
-//      public void commitTask(TaskAttemptContext taskContext) { }
-//
-//      @Override
-//      public void abortTask(TaskAttemptContext taskContext) { }
-//
-//      @Override
-//      public boolean needsTaskCommit(TaskAttemptContext taskContext) {
-//        return false;
-//      }
-//    }
-//  }
-//
-//  public static class DummyLoader extends Loader {
-//    private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
-//    private Data expected = new Data();
-//    private CSVIntermediateDataFormat actual = new 
CSVIntermediateDataFormat();
-//
-//    @Override
-//    public void load(LoaderContext context, Object oc, Object oj) throws 
Exception{
-//      String data;
-//      while ((data = context.getDataReader().readTextRecord()) != null) {
-//
-////        actual.setSchema(context.getSchema());
-////        actual.setObjectData(array, false);
-//        expected.setContent(new Object[] {
-//          index,
-//          (double) index,
-//          String.valueOf(index)},
-//          Data.ARRAY_RECORD);
-//        index++;
-//        assertEquals(expected.toString(), data);
-//      }
-//    }
-//  }
+  private static final int START_PARTITION = 1;
+  private static final int NUMBER_OF_PARTITIONS = 9;
+  private static final int NUMBER_OF_ROWS_PER_PARTITION = 10;
+
+  public void testInputFormat() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_ETL_PARTITIONER, 
DummyPartitioner.class.getName());
+    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+      CSVIntermediateDataFormat.class.getName());
+    Job job = new Job(conf);
+
+    SqoopInputFormat inputformat = new SqoopInputFormat();
+    List<InputSplit> splits = inputformat.getSplits(job);
+    assertEquals(9, splits.size());
+
+    for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
+      SqoopSplit split = (SqoopSplit)splits.get(id-1);
+      DummyPartition partition = (DummyPartition)split.getPartition();
+      assertEquals(id, partition.getId());
+    }
+  }
+
+  public void testMapper() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_ETL_PARTITIONER, 
DummyPartitioner.class.getName());
+    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+      CSVIntermediateDataFormat.class.getName());
+    Schema schema = new Schema("Test");
+    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+      .addColumn(new org.apache.sqoop.schema.type.Text("3"));
+
+    Job job = new Job(conf);
+    ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
+    JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, 
SqoopMapper.class,
+        DummyOutputFormat.class);
+  }
+
+  public void testOutputFormat() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_ETL_PARTITIONER, 
DummyPartitioner.class.getName());
+    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+    conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
+    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+      CSVIntermediateDataFormat.class.getName());
+    Schema schema = new Schema("Test");
+    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+      .addColumn(new Text("3"));
+
+    Job job = new Job(conf);
+    ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
+    JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, 
SqoopMapper.class,
+        SqoopNullOutputFormat.class);
+  }
+
+  public static class DummyPartition extends Partition {
+    private int id;
+
+    public void setId(int id) {
+      this.id = id;
+    }
+
+    public int getId() {
+      return id;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      id = in.readInt();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(id);
+    }
+
+    @Override
+    public String toString() {
+      return Integer.toString(id);
+    }
+  }
+
+  public static class DummyPartitioner extends Partitioner {
+    @Override
+    public List<Partition> getPartitions(PartitionerContext context, Object 
oc, Object oj) {
+      List<Partition> partitions = new LinkedList<Partition>();
+      for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
+        DummyPartition partition = new DummyPartition();
+        partition.setId(id);
+        partitions.add(partition);
+      }
+      return partitions;
+    }
+  }
+
+  public static class DummyExtractor extends Extractor {
+    @Override
+    public void extract(ExtractorContext context, Object oc, Object oj, Object 
partition) {
+      int id = ((DummyPartition)partition).getId();
+      for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
+        context.getDataWriter().writeArrayRecord(new Object[] {
+            id * NUMBER_OF_ROWS_PER_PARTITION + row,
+            (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row),
+            String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
+      }
+    }
+
+    @Override
+    public long getRowsRead() {
+      return NUMBER_OF_ROWS_PER_PARTITION;
+    }
+  }
+
+  public static class DummyOutputFormat
+      extends OutputFormat<SqoopWritable, NullWritable> {
+    @Override
+    public void checkOutputSpecs(JobContext context) {
+      // do nothing
+    }
+
+    @Override
+    public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(
+        TaskAttemptContext context) {
+      return new DummyRecordWriter();
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
+      return new DummyOutputCommitter();
+    }
+
+    public static class DummyRecordWriter
+        extends RecordWriter<SqoopWritable, NullWritable> {
+      private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
+      private Data data = new Data();
+
+      @Override
+      public void write(SqoopWritable key, NullWritable value) {
+
+        data.setContent(new Object[] {
+          index,
+          (double) index,
+          String.valueOf(index)},
+          Data.ARRAY_RECORD);
+        index++;
+
+        assertEquals(data.toString(), key.toString());
+      }
+
+      @Override
+      public void close(TaskAttemptContext context) {
+        // do nothing
+      }
+    }
+
+    public static class DummyOutputCommitter extends OutputCommitter {
+      @Override
+      public void setupJob(JobContext jobContext) { }
+
+      @Override
+      public void setupTask(TaskAttemptContext taskContext) { }
+
+      @Override
+      public void commitTask(TaskAttemptContext taskContext) { }
+
+      @Override
+      public void abortTask(TaskAttemptContext taskContext) { }
+
+      @Override
+      public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+        return false;
+      }
+    }
+  }
+
+  public static class DummyLoader extends Loader {
+    private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
+    private Data expected = new Data();
+    private CSVIntermediateDataFormat actual = new CSVIntermediateDataFormat();
+
+    @Override
+    public void load(LoaderContext context, Object oc, Object oj) throws 
Exception{
+      String data;
+      while ((data = context.getDataReader().readTextRecord()) != null) {
+        expected.setContent(new Object[] {
+          index,
+          (double) index,
+          String.valueOf(index)},
+          Data.ARRAY_RECORD);
+        index++;
+        assertEquals(expected.toString(), data);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2b214cdd/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java
----------------------------------------------------------------------
diff --git 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java
index 48fb61f..91df426 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java
@@ -25,94 +25,94 @@ import org.junit.Test;
 
 public class TestData extends TestCase {
 
-//  private static final double TEST_NUMBER = Math.PI + 100;
-//  @Test
-//  public void testArrayToCsv() throws Exception {
-//    Data data = new Data();
-//    String expected;
-//    String actual;
-//
-//    // with special characters:
-//    expected =
-//        Long.valueOf((long)TEST_NUMBER) + "," +
-//        Double.valueOf(TEST_NUMBER) + "," +
-//        "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," +
-//        Arrays.toString(new byte[] {1, 2, 3, 4, 5});
-//    data.setContent(new Object[] {
-//        Long.valueOf((long)TEST_NUMBER),
-//        Double.valueOf(TEST_NUMBER),
-//        String.valueOf(TEST_NUMBER) + "',s",
-//        new byte[] {1, 2, 3, 4, 5} },
-//        Data.ARRAY_RECORD);
-//    actual = (String)data.getContent(Data.CSV_RECORD);
-//    assertEquals(expected, actual);
-//
-//    // with null characters:
-//    expected =
-//        Long.valueOf((long)TEST_NUMBER) + "," +
-//        Double.valueOf(TEST_NUMBER) + "," +
-//        "null" + "," +
-//        Arrays.toString(new byte[] {1, 2, 3, 4, 5});
-//    data.setContent(new Object[] {
-//        Long.valueOf((long)TEST_NUMBER),
-//        Double.valueOf(TEST_NUMBER),
-//        null,
-//        new byte[] {1, 2, 3, 4, 5} },
-//        Data.ARRAY_RECORD);
-//    actual = (String)data.getContent(Data.CSV_RECORD);
-//    assertEquals(expected, actual);
-//  }
-//
-//  @Test
-//  public void testCsvToArray() throws Exception {
-//    Data data = new Data();
-//    Object[] expected;
-//    Object[] actual;
-//
-//    // with special characters:
-//    expected = new Object[] {
-//        Long.valueOf((long)TEST_NUMBER),
-//        Double.valueOf(TEST_NUMBER),
-//        String.valueOf(TEST_NUMBER) + "',s",
-//        new byte[] {1, 2, 3, 4, 5} };
-//    data.setContent(
-//        Long.valueOf((long)TEST_NUMBER) + "," +
-//        Double.valueOf(TEST_NUMBER) + "," +
-//        "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," +
-//        Arrays.toString(new byte[] {1, 2, 3, 4, 5}),
-//        Data.CSV_RECORD);
-//    actual = (Object[])data.getContent(Data.ARRAY_RECORD);
-//    assertEquals(expected.length, actual.length);
-//    for (int c=0; c<expected.length; c++) {
-//      assertEquals(expected[c], actual[c]);
-//    }
-//
-//    // with null characters:
-//    expected = new Object[] {
-//        Long.valueOf((long)TEST_NUMBER),
-//        Double.valueOf(TEST_NUMBER),
-//        null,
-//        new byte[] {1, 2, 3, 4, 5} };
-//    data.setContent(
-//        Long.valueOf((long)TEST_NUMBER) + "," +
-//        Double.valueOf(TEST_NUMBER) + "," +
-//        "null" + "," +
-//        Arrays.toString(new byte[] {1, 2, 3, 4, 5}),
-//        Data.CSV_RECORD);
-//    actual = (Object[])data.getContent(Data.ARRAY_RECORD);
-//    assertEquals(expected.length, actual.length);
-//    for (int c=0; c<expected.length; c++) {
-//      assertEquals(expected[c], actual[c]);
-//    }
-//  }
-//
-//  public static void assertEquals(Object expected, Object actual) {
-//    if (expected instanceof byte[]) {
-//      assertEquals(Arrays.toString((byte[])expected),
-//          Arrays.toString((byte[])actual));
-//    } else {
-//      TestCase.assertEquals(expected, actual);
-//    }
-//  }
+  private static final double TEST_NUMBER = Math.PI + 100;
+  @Test
+  public void testArrayToCsv() throws Exception {
+    Data data = new Data();
+    String expected;
+    String actual;
+
+    // with special characters:
+    expected =
+        Long.valueOf((long)TEST_NUMBER) + "," +
+        Double.valueOf(TEST_NUMBER) + "," +
+        "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," +
+        Arrays.toString(new byte[] {1, 2, 3, 4, 5});
+    data.setContent(new Object[] {
+        Long.valueOf((long)TEST_NUMBER),
+        Double.valueOf(TEST_NUMBER),
+        String.valueOf(TEST_NUMBER) + "',s",
+        new byte[] {1, 2, 3, 4, 5} },
+        Data.ARRAY_RECORD);
+    actual = (String)data.getContent(Data.CSV_RECORD);
+    assertEquals(expected, actual);
+
+    // with null characters:
+    expected =
+        Long.valueOf((long)TEST_NUMBER) + "," +
+        Double.valueOf(TEST_NUMBER) + "," +
+        "null" + "," +
+        Arrays.toString(new byte[] {1, 2, 3, 4, 5});
+    data.setContent(new Object[] {
+        Long.valueOf((long)TEST_NUMBER),
+        Double.valueOf(TEST_NUMBER),
+        null,
+        new byte[] {1, 2, 3, 4, 5} },
+        Data.ARRAY_RECORD);
+    actual = (String)data.getContent(Data.CSV_RECORD);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testCsvToArray() throws Exception {
+    Data data = new Data();
+    Object[] expected;
+    Object[] actual;
+
+    // with special characters:
+    expected = new Object[] {
+        Long.valueOf((long)TEST_NUMBER),
+        Double.valueOf(TEST_NUMBER),
+        String.valueOf(TEST_NUMBER) + "',s",
+        new byte[] {1, 2, 3, 4, 5} };
+    data.setContent(
+        Long.valueOf((long)TEST_NUMBER) + "," +
+        Double.valueOf(TEST_NUMBER) + "," +
+        "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," +
+        Arrays.toString(new byte[] {1, 2, 3, 4, 5}),
+        Data.CSV_RECORD);
+    actual = (Object[])data.getContent(Data.ARRAY_RECORD);
+    assertEquals(expected.length, actual.length);
+    for (int c=0; c<expected.length; c++) {
+      assertEquals(expected[c], actual[c]);
+    }
+
+    // with null characters:
+    expected = new Object[] {
+        Long.valueOf((long)TEST_NUMBER),
+        Double.valueOf(TEST_NUMBER),
+        null,
+        new byte[] {1, 2, 3, 4, 5} };
+    data.setContent(
+        Long.valueOf((long)TEST_NUMBER) + "," +
+        Double.valueOf(TEST_NUMBER) + "," +
+        "null" + "," +
+        Arrays.toString(new byte[] {1, 2, 3, 4, 5}),
+        Data.CSV_RECORD);
+    actual = (Object[])data.getContent(Data.ARRAY_RECORD);
+    assertEquals(expected.length, actual.length);
+    for (int c=0; c<expected.length; c++) {
+      assertEquals(expected[c], actual[c]);
+    }
+  }
+
+  public static void assertEquals(Object expected, Object actual) {
+    if (expected instanceof byte[]) {
+      assertEquals(Arrays.toString((byte[])expected),
+          Arrays.toString((byte[])actual));
+    } else {
+      TestCase.assertEquals(expected, actual);
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2b214cdd/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
----------------------------------------------------------------------
diff --git 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
index 25e83a2..1447e00 100644
--- 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
+++ 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
@@ -19,6 +19,7 @@ package org.apache.sqoop.job.mr;
 
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.model.ConfigurationClass;
 import org.apache.sqoop.model.Form;
 import org.apache.sqoop.model.FormClass;
@@ -41,140 +42,139 @@ import static org.mockito.Mockito.when;
  */
 public class TestConfigurationUtils {
 
-//  Job job;
-//  JobConf jobConf;
-//
-//  @Before
-//  public void setUp() throws Exception {
-//    setUpJob();
-//    setUpJobConf();
-//  }
-//
-//  public void setUpJob() throws Exception {
-//    job = new Job();
-//  }
-//
-//  public void setUpJobConf() throws Exception {
-//    jobConf = spy(new JobConf(job.getConfiguration()));
-//    when(jobConf.getCredentials()).thenReturn(job.getCredentials());
-//  }
-//
-//  @Test
-//  public void testJobType() throws Exception {
-//    ConfigurationUtils.setJobType(job.getConfiguration(), MJob.Type.IMPORT);
-//    setUpJobConf();
-//    assertEquals(MJob.Type.IMPORT, ConfigurationUtils.getJobType(jobConf));
-//  }
-//
-//  @Test
-//  public void testConfigConnectorConnection() throws Exception {
-//    ConfigurationUtils.setConfigFromConnectorConnection(job, getConfig());
-//    setUpJobConf();
-//    assertEquals(getConfig(), 
ConfigurationUtils.getConfigFromConnectorConnection(jobConf));
-//  }
-//
-//  @Test
-//  public void testConfigConnectorJob() throws Exception {
-//    ConfigurationUtils.setConfigFromConnectorJob(job, getConfig());
-//    setUpJobConf();
-//    assertEquals(getConfig(), 
ConfigurationUtils.getConfigFromConnectorJob(jobConf));
-//  }
-//
-//  @Test
-//  public void testConfigFrameworkConnection() throws Exception {
-//    ConfigurationUtils.setConfigFrameworkConnection(job, getConfig());
-//    setUpJobConf();
-//    assertEquals(getConfig(), 
ConfigurationUtils.getConfigFrameworkConnection(jobConf));
-//  }
-//
-//  @Test
-//  public void testConfigFrameworkJob() throws Exception {
-//    ConfigurationUtils.setFrameworkJobConfig(job, getConfig());
-//    setUpJobConf();
-//    assertEquals(getConfig(), 
ConfigurationUtils.getFrameworkJobConfig(jobConf));
-//  }
-//
-//  @Test
-//  public void testConnectorSchema() throws Exception {
-//    ConfigurationUtils.setConnectorSchema(job, getSchema("a"));
-//    assertEquals(getSchema("a"), 
ConfigurationUtils.getFromConnectorSchema(jobConf));
-//  }
-//
-//  @Test
-//  public void testConnectorSchemaNull() throws Exception {
-//    ConfigurationUtils.setConnectorSchema(job, null);
-//    assertNull(ConfigurationUtils.getFromConnectorSchema(jobConf));
-//  }
-//
-//  @Test
-//  public void testHioSchema() throws Exception {
-//    ConfigurationUtils.setHioSchema(job, getSchema("a"));
-//    assertEquals(getSchema("a"), ConfigurationUtils.getHioSchema(jobConf));
-//  }
-//
-//  @Test
-//  public void testHioSchemaNull() throws Exception {
-//    ConfigurationUtils.setHioSchema(job, null);
-//    assertNull(ConfigurationUtils.getHioSchema(jobConf));
-//  }
-//
-//  private Schema getSchema(String name) {
-//    return new Schema(name).addColumn(new Text("c1"));
-//  }
-//
-//  private Config getConfig() {
-//    Config c = new Config();
-//    c.f.A = "This is secret text!";
-//    return c;
-//  }
-//
-//  @FormClass
-//  public static class F {
-//
-//    @Input String A;
-//
-//    @Override
-//    public boolean equals(Object o) {
-//      if (this == o) return true;
-//      if (!(o instanceof F)) return false;
-//
-//      F f = (F) o;
-//
-//      if (A != null ? !A.equals(f.A) : f.A != null) return false;
-//
-//      return true;
-//    }
-//
-//    @Override
-//    public int hashCode() {
-//      return A != null ? A.hashCode() : 0;
-//    }
-//  }
-//
-//  @ConfigurationClass
-//  public static class Config {
-//    @Form F f;
-//
-//    public Config() {
-//      f = new F();
-//    }
-//
-//    @Override
-//    public boolean equals(Object o) {
-//      if (this == o) return true;
-//      if (!(o instanceof Config)) return false;
-//
-//      Config config = (Config) o;
-//
-//      if (f != null ? !f.equals(config.f) : config.f != null)
-//        return false;
-//
-//      return true;
-//    }
-//
-//    @Override
-//    public int hashCode() {
-//      return f != null ? f.hashCode() : 0;
-//    }
-//  }
+  Job job;
+  JobConf jobConf;
+
+  @Before
+  public void setUp() throws Exception {
+    setUpJob();
+    setUpJobConf();
+  }
+
+  public void setUpJob() throws Exception {
+    job = new Job();
+  }
+
+  public void setUpJobConf() throws Exception {
+    jobConf = spy(new JobConf(job.getConfiguration()));
+    when(jobConf.getCredentials()).thenReturn(job.getCredentials());
+  }
+
+  @Test
+  public void testConfigConnectorConnection() throws Exception {
+    ConfigurationUtils.setConnectorConnectionConfig(Direction.FROM, job, 
getConfig());
+    setUpJobConf();
+    assertEquals(getConfig(), 
ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, jobConf));
+
+    ConfigurationUtils.setConnectorConnectionConfig(Direction.TO, job, 
getConfig());
+    setUpJobConf();
+    assertEquals(getConfig(), 
ConfigurationUtils.getConnectorConnectionConfig(Direction.TO, jobConf));
+  }
+
+  @Test
+  public void testConfigConnectorJob() throws Exception {
+    ConfigurationUtils.setConnectorJobConfig(Direction.FROM, job, getConfig());
+    setUpJobConf();
+    assertEquals(getConfig(), 
ConfigurationUtils.getConnectorJobConfig(Direction.FROM, jobConf));
+
+    ConfigurationUtils.setConnectorJobConfig(Direction.TO, job, getConfig());
+    setUpJobConf();
+    assertEquals(getConfig(), 
ConfigurationUtils.getConnectorJobConfig(Direction.TO, jobConf));
+  }
+
+  @Test
+  public void testConfigFrameworkConnection() throws Exception {
+    ConfigurationUtils.setFrameworkConnectionConfig(Direction.FROM, job, 
getConfig());
+    setUpJobConf();
+    assertEquals(getConfig(), 
ConfigurationUtils.getFrameworkConnectionConfig(Direction.FROM, jobConf));
+
+    ConfigurationUtils.setFrameworkConnectionConfig(Direction.TO, job, 
getConfig());
+    setUpJobConf();
+    assertEquals(getConfig(), 
ConfigurationUtils.getFrameworkConnectionConfig(Direction.TO, jobConf));
+  }
+
+  @Test
+  public void testConfigFrameworkJob() throws Exception {
+    ConfigurationUtils.setFrameworkJobConfig(job, getConfig());
+    setUpJobConf();
+    assertEquals(getConfig(), 
ConfigurationUtils.getFrameworkJobConfig(jobConf));
+  }
+
+  @Test
+  public void testConnectorSchema() throws Exception {
+    ConfigurationUtils.setConnectorSchema(Direction.FROM, job, getSchema("a"));
+    assertEquals(getSchema("a"), 
ConfigurationUtils.getConnectorSchema(Direction.FROM, jobConf));
+
+    ConfigurationUtils.setConnectorSchema(Direction.TO, job, getSchema("b"));
+    assertEquals(getSchema("b"), 
ConfigurationUtils.getConnectorSchema(Direction.TO, jobConf));
+  }
+
+  @Test
+  public void testConnectorSchemaNull() throws Exception {
+    ConfigurationUtils.setConnectorSchema(Direction.FROM, job, null);
+    assertNull(ConfigurationUtils.getConnectorSchema(Direction.FROM, jobConf));
+
+    ConfigurationUtils.setConnectorSchema(Direction.TO, job, null);
+    assertNull(ConfigurationUtils.getConnectorSchema(Direction.FROM, jobConf));
+  }
+
+  private Schema getSchema(String name) {
+    return new Schema(name).addColumn(new Text("c1"));
+  }
+
+  private Config getConfig() {
+    Config c = new Config();
+    c.f.A = "This is secret text!";
+    return c;
+  }
+
+  @FormClass
+  public static class F {
+
+    @Input String A;
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (!(o instanceof F)) return false;
+
+      F f = (F) o;
+
+      if (A != null ? !A.equals(f.A) : f.A != null) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return A != null ? A.hashCode() : 0;
+    }
+  }
+
+  @ConfigurationClass
+  public static class Config {
+    @Form F f;
+
+    public Config() {
+      f = new F();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (!(o instanceof Config)) return false;
+
+      Config config = (Config) o;
+
+      if (f != null ? !f.equals(config.f) : config.f != null)
+        return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return f != null ? f.hashCode() : 0;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2b214cdd/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
index c28a39e..c2ebd7e 100644
--- 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
+++ 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
@@ -23,12 +23,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.sqoop.common.SqoopException;
-//import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
-//import org.apache.sqoop.connector.idf.IntermediateDataFormat;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.etl.LoaderContext;
-import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.job.io.SqoopWritable;
 import org.junit.Before;
 import org.junit.Test;
@@ -39,185 +38,182 @@ import java.util.concurrent.TimeUnit;
 
 public class TestSqoopOutputFormatLoadExecutor {
 
-//  private Configuration conf;
-//
-//  public static class ThrowingLoader extends Loader {
-//
-//    public ThrowingLoader() {
-//
-//    }
-//
-//    @Override
-//    public void load(LoaderContext context, Object cc, Object jc) throws 
Exception {
-//      context.getDataReader().readTextRecord();
-//      throw new BrokenBarrierException();
-//    }
-//  }
-//
-//  public static class ThrowingContinuousLoader extends Loader {
-//
-//    public ThrowingContinuousLoader() {
-//    }
-//
-//    @Override
-//    public void load(LoaderContext context, Object cc, Object jc) throws 
Exception {
-//      int runCount = 0;
-//      Object o;
-//      String[] arr;
-//      while ((o = context.getDataReader().readTextRecord()) != null) {
-//        arr = o.toString().split(",");
-//        Assert.assertEquals(100, arr.length);
-//        for (int i = 0; i < arr.length; i++) {
-//          Assert.assertEquals(i, Integer.parseInt(arr[i]));
-//        }
-//        runCount++;
-//        if (runCount == 5) {
-//          throw new ConcurrentModificationException();
-//        }
-//      }
-//    }
-//  }
-//
-//  public static class GoodLoader extends Loader {
-//
-//    public GoodLoader() {
-//
-//    }
-//
-//    @Override
-//    public void load(LoaderContext context, Object cc, Object jc) throws 
Exception {
-//      String[] arr = 
context.getDataReader().readTextRecord().toString().split(",");
-//      Assert.assertEquals(100, arr.length);
-//      for (int i = 0; i < arr.length; i++) {
-//        Assert.assertEquals(i, Integer.parseInt(arr[i]));
-//      }
-//    }
-//  }
-//
-//  public static class GoodContinuousLoader extends Loader {
-//
-//    public GoodContinuousLoader() {
-//
-//    }
-//
-//    @Override
-//    public void load(LoaderContext context, Object cc, Object jc) throws 
Exception {
-//      int runCount = 0;
-//      Object o;
-//      String[] arr;
-//      while ((o = context.getDataReader().readTextRecord()) != null) {
-//        arr = o.toString().split(",");
-//        Assert.assertEquals(100, arr.length);
-//        for (int i = 0; i < arr.length; i++) {
-//          Assert.assertEquals(i, Integer.parseInt(arr[i]));
-//        }
-//        runCount++;
-//      }
-//      Assert.assertEquals(10, runCount);
-//    }
-//  }
-//
-//
-//  @Before
-//  public void setUp() {
-//    conf = new Configuration();
-//    conf.setIfUnset(JobConstants.INTERMEDIATE_DATA_FORMAT, 
CSVIntermediateDataFormat.class.getName());
-//
-//  }
-//
-//  @Test(expected = BrokenBarrierException.class)
-//  public void testWhenLoaderThrows() throws Throwable {
-//    ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
-//    conf.set(JobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName());
-//    SqoopOutputFormatLoadExecutor executor = new
-//        SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName());
-//    RecordWriter<SqoopWritable, NullWritable> writer = 
executor.getRecordWriter();
-//    IntermediateDataFormat data = new CSVIntermediateDataFormat();
-//    SqoopWritable writable = new SqoopWritable();
-//    try {
-//      for (int count = 0; count < 100; count++) {
-//        data.setTextData(String.valueOf(count));
-//        writable.setString(data.getTextData());
-//        writer.write(writable, null);
-//      }
-//    } catch (SqoopException ex) {
-//      throw ex.getCause();
-//    }
-//  }
-//
-//  @Test
-//  public void testSuccessfulContinuousLoader() throws Throwable {
-//    ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
-//    conf.set(JobConstants.JOB_ETL_LOADER, 
GoodContinuousLoader.class.getName());
-//    SqoopOutputFormatLoadExecutor executor = new
-//        SqoopOutputFormatLoadExecutor(true, 
GoodContinuousLoader.class.getName());
-//    RecordWriter<SqoopWritable, NullWritable> writer = 
executor.getRecordWriter();
-//    IntermediateDataFormat data = new CSVIntermediateDataFormat();
-//    SqoopWritable writable = new SqoopWritable();
-//    for (int i = 0; i < 10; i++) {
-//      StringBuilder builder = new StringBuilder();
-//      for (int count = 0; count < 100; count++) {
-//        builder.append(String.valueOf(count));
-//        if (count != 99) {
-//          builder.append(",");
-//        }
-//      }
-//      data.setTextData(builder.toString());
-//      writable.setString(data.getTextData());
-//      writer.write(writable, null);
-//    }
-//    writer.close(null);
-//  }
-//
-//  @Test (expected = SqoopException.class)
-//  public void testSuccessfulLoader() throws Throwable {
-//    SqoopOutputFormatLoadExecutor executor = new
-//        SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName());
-//    RecordWriter<SqoopWritable, NullWritable> writer = 
executor.getRecordWriter();
-//    IntermediateDataFormat data = new CSVIntermediateDataFormat();
-//    SqoopWritable writable = new SqoopWritable();
-//    StringBuilder builder = new StringBuilder();
-//    for (int count = 0; count < 100; count++) {
-//      builder.append(String.valueOf(count));
-//      if (count != 99) {
-//        builder.append(",");
-//      }
-//    }
-//    data.setTextData(builder.toString());
-//    writable.setString(data.getTextData());
-//    writer.write(writable, null);
-//
-//    //Allow writer to complete.
-//    TimeUnit.SECONDS.sleep(5);
-//    writer.close(null);
-//  }
-//
-//
-//  @Test(expected = ConcurrentModificationException.class)
-//  public void testThrowingContinuousLoader() throws Throwable {
-//    ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
-//    conf.set(JobConstants.JOB_ETL_LOADER, 
ThrowingContinuousLoader.class.getName());
-//    SqoopOutputFormatLoadExecutor executor = new
-//        SqoopOutputFormatLoadExecutor(true, 
ThrowingContinuousLoader.class.getName());
-//    RecordWriter<SqoopWritable, NullWritable> writer = 
executor.getRecordWriter();
-//    IntermediateDataFormat data = new CSVIntermediateDataFormat();
-//    SqoopWritable writable = new SqoopWritable();
-//    try {
-//      for (int i = 0; i < 10; i++) {
-//        StringBuilder builder = new StringBuilder();
-//        for (int count = 0; count < 100; count++) {
-//          builder.append(String.valueOf(count));
-//          if (count != 99) {
-//            builder.append(",");
-//          }
-//        }
-//        data.setTextData(builder.toString());
-//        writable.setString(data.getTextData());
-//        writer.write(writable, null);
-//      }
-//      writer.close(null);
-//    } catch (SqoopException ex) {
-//      throw ex.getCause();
-//    }
-//  }
+  private Configuration conf;
+
+  public static class ThrowingLoader extends Loader {
+
+    public ThrowingLoader() {
+
+    }
+
+    @Override
+    public void load(LoaderContext context, Object cc, Object jc) throws 
Exception {
+      context.getDataReader().readTextRecord();
+      throw new BrokenBarrierException();
+    }
+  }
+
+  public static class ThrowingContinuousLoader extends Loader {
+
+    public ThrowingContinuousLoader() {
+    }
+
+    @Override
+    public void load(LoaderContext context, Object cc, Object jc) throws 
Exception {
+      int runCount = 0;
+      Object o;
+      String[] arr;
+      while ((o = context.getDataReader().readTextRecord()) != null) {
+        arr = o.toString().split(",");
+        Assert.assertEquals(100, arr.length);
+        for (int i = 0; i < arr.length; i++) {
+          Assert.assertEquals(i, Integer.parseInt(arr[i]));
+        }
+        runCount++;
+        if (runCount == 5) {
+          throw new ConcurrentModificationException();
+        }
+      }
+    }
+  }
+
+  public static class GoodLoader extends Loader {
+
+    public GoodLoader() {
+
+    }
+
+    @Override
+    public void load(LoaderContext context, Object cc, Object jc) throws 
Exception {
+      String[] arr = 
context.getDataReader().readTextRecord().toString().split(",");
+      Assert.assertEquals(100, arr.length);
+      for (int i = 0; i < arr.length; i++) {
+        Assert.assertEquals(i, Integer.parseInt(arr[i]));
+      }
+    }
+  }
+
+  public static class GoodContinuousLoader extends Loader {
+
+    public GoodContinuousLoader() {
+
+    }
+
+    @Override
+    public void load(LoaderContext context, Object cc, Object jc) throws 
Exception {
+      int runCount = 0;
+      Object o;
+      String[] arr;
+      while ((o = context.getDataReader().readTextRecord()) != null) {
+        arr = o.toString().split(",");
+        Assert.assertEquals(100, arr.length);
+        for (int i = 0; i < arr.length; i++) {
+          Assert.assertEquals(i, Integer.parseInt(arr[i]));
+        }
+        runCount++;
+      }
+      Assert.assertEquals(10, runCount);
+    }
+  }
+
+
+  @Before
+  public void setUp() {
+    conf = new Configuration();
+    conf.setIfUnset(JobConstants.INTERMEDIATE_DATA_FORMAT, 
CSVIntermediateDataFormat.class.getName());
+
+  }
+
+  @Test(expected = BrokenBarrierException.class)
+  public void testWhenLoaderThrows() throws Throwable {
+    conf.set(JobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName());
+    SqoopOutputFormatLoadExecutor executor = new
+        SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName());
+    RecordWriter<SqoopWritable, NullWritable> writer = 
executor.getRecordWriter();
+    IntermediateDataFormat data = new CSVIntermediateDataFormat();
+    SqoopWritable writable = new SqoopWritable();
+    try {
+      for (int count = 0; count < 100; count++) {
+        data.setTextData(String.valueOf(count));
+        writable.setString(data.getTextData());
+        writer.write(writable, null);
+      }
+    } catch (SqoopException ex) {
+      throw ex.getCause();
+    }
+  }
+
+  @Test
+  public void testSuccessfulContinuousLoader() throws Throwable {
+    conf.set(JobConstants.JOB_ETL_LOADER, 
GoodContinuousLoader.class.getName());
+    SqoopOutputFormatLoadExecutor executor = new
+        SqoopOutputFormatLoadExecutor(true, 
GoodContinuousLoader.class.getName());
+    RecordWriter<SqoopWritable, NullWritable> writer = 
executor.getRecordWriter();
+    IntermediateDataFormat data = new CSVIntermediateDataFormat();
+    SqoopWritable writable = new SqoopWritable();
+    for (int i = 0; i < 10; i++) {
+      StringBuilder builder = new StringBuilder();
+      for (int count = 0; count < 100; count++) {
+        builder.append(String.valueOf(count));
+        if (count != 99) {
+          builder.append(",");
+        }
+      }
+      data.setTextData(builder.toString());
+      writable.setString(data.getTextData());
+      writer.write(writable, null);
+    }
+    writer.close(null);
+  }
+
+  @Test (expected = SqoopException.class)
+  public void testSuccessfulLoader() throws Throwable {
+    SqoopOutputFormatLoadExecutor executor = new
+        SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName());
+    RecordWriter<SqoopWritable, NullWritable> writer = 
executor.getRecordWriter();
+    IntermediateDataFormat data = new CSVIntermediateDataFormat();
+    SqoopWritable writable = new SqoopWritable();
+    StringBuilder builder = new StringBuilder();
+    for (int count = 0; count < 100; count++) {
+      builder.append(String.valueOf(count));
+      if (count != 99) {
+        builder.append(",");
+      }
+    }
+    data.setTextData(builder.toString());
+    writable.setString(data.getTextData());
+    writer.write(writable, null);
+
+    //Allow writer to complete.
+    TimeUnit.SECONDS.sleep(5);
+    writer.close(null);
+  }
+
+
+  @Test(expected = ConcurrentModificationException.class)
+  public void testThrowingContinuousLoader() throws Throwable {
+    conf.set(JobConstants.JOB_ETL_LOADER, 
ThrowingContinuousLoader.class.getName());
+    SqoopOutputFormatLoadExecutor executor = new
+        SqoopOutputFormatLoadExecutor(true, 
ThrowingContinuousLoader.class.getName());
+    RecordWriter<SqoopWritable, NullWritable> writer = 
executor.getRecordWriter();
+    IntermediateDataFormat data = new CSVIntermediateDataFormat();
+    SqoopWritable writable = new SqoopWritable();
+    try {
+      for (int i = 0; i < 10; i++) {
+        StringBuilder builder = new StringBuilder();
+        for (int count = 0; count < 100; count++) {
+          builder.append(String.valueOf(count));
+          if (count != 99) {
+            builder.append(",");
+          }
+        }
+        data.setTextData(builder.toString());
+        writable.setString(data.getTextData());
+        writer.write(writable, null);
+      }
+      writer.close(null);
+    } catch (SqoopException ex) {
+      throw ex.getCause();
+    }
+  }
 }

Reply via email to