Updated Branches:
  refs/heads/sqoop2 47cb311a5 -> 3a88f0b48

SQOOP-589: Framework-defined text/sequence loaders for HDFS.

(Bilung Lee via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/3a88f0b4
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/3a88f0b4
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/3a88f0b4

Branch: refs/heads/sqoop2
Commit: 3a88f0b48c7350af5b373c45209c6452b07d7a98
Parents: 47cb311
Author: Jarek Jarcec Cecho <[email protected]>
Authored: Tue Sep 18 09:21:08 2012 +0200
Committer: Jarek Jarcec Cecho <[email protected]>
Committed: Tue Sep 18 09:21:08 2012 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/sqoop/core/CoreError.java |    6 +-
 .../java/org/apache/sqoop/job/JobConstants.java    |   11 +-
 .../java/org/apache/sqoop/job/etl/EtlContext.java  |   11 +-
 .../apache/sqoop/job/etl/EtlMutableContext.java    |    7 -
 .../sqoop/job/etl/HdfsSequenceImportLoader.java    |  107 +++++++
 .../apache/sqoop/job/etl/HdfsTextImportLoader.java |  100 ++++++
 .../main/java/org/apache/sqoop/job/io/Data.java    |   79 +++--
 .../apache/sqoop/job/mr/SqoopFileOutputFormat.java |   25 ++-
 .../java/org/apache/sqoop/job/mr/SqoopMapper.java  |    9 +-
 .../job/mr/SqoopOutputFormatLoadExecutor.java      |    7 +-
 .../test/java/org/apache/sqoop/job/FileUtils.java  |   40 +++
 .../test/java/org/apache/sqoop/job/JobUtils.java   |   69 +++++
 .../java/org/apache/sqoop/job/TestHdfsLoad.java    |  233 +++++++++++++++
 .../java/org/apache/sqoop/job/TestMapReduce.java   |   55 +---
 .../java/org/apache/sqoop/job/etl/Context.java     |    2 -
 .../org/apache/sqoop/job/etl/MutableContext.java   |    2 -
 .../java/org/apache/sqoop/job/io/DataReader.java   |    2 +
 .../java/org/apache/sqoop/job/io/DataWriter.java   |    2 +
 18 files changed, 662 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/main/java/org/apache/sqoop/core/CoreError.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/core/CoreError.java 
b/core/src/main/java/org/apache/sqoop/core/CoreError.java
index 303cd80..2697eef 100644
--- a/core/src/main/java/org/apache/sqoop/core/CoreError.java
+++ b/core/src/main/java/org/apache/sqoop/core/CoreError.java
@@ -54,7 +54,7 @@ public enum CoreError implements ErrorCode {
   /** Error occurs during job execution. */
   CORE_0008("Error occurs during job execution"),
 
-  /** The system was load to instantiate the specified class. */
+  /** The system was unable to load the specified class. */
   CORE_0009("Unable to load the specified class"),
 
   /** The system was unable to instantiate the specified class. */
@@ -63,8 +63,8 @@ public enum CoreError implements ErrorCode {
   /** The parameter already exists in the context */
   CORE_0011("The parameter already exists in the context"),
 
-  /** The data type is not supported */
-  CORE_0012("The data type is not supported"),
+  /** The type is not supported */
+  CORE_0012("The type is not supported"),
 
   /** Cannot write to the data writer */
   CORE_0013("Cannot write to the data writer"),

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/main/java/org/apache/sqoop/job/JobConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/JobConstants.java 
b/core/src/main/java/org/apache/sqoop/job/JobConstants.java
index 0c1e0d0..54fc543 100644
--- a/core/src/main/java/org/apache/sqoop/job/JobConstants.java
+++ b/core/src/main/java/org/apache/sqoop/job/JobConstants.java
@@ -28,6 +28,7 @@ public final class JobConstants {
   public static final String PREFIX_JOB_CONFIG =
       ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "job.";
 
+
   public static final String JOB_ETL_PARTITIONER = PREFIX_JOB_CONFIG
       + "etl.partitioner";
 
@@ -37,11 +38,13 @@ public final class JobConstants {
   public static final String JOB_ETL_LOADER = PREFIX_JOB_CONFIG
       + "etl.loader";
 
-  public static final String JOB_ETL_FIELD_NAMES = PREFIX_JOB_CONFIG
-      + "etl.field.names";
 
-  public static final String JOB_ETL_OUTPUT_DIRECTORY = PREFIX_JOB_CONFIG
-      + "etl.output.directory";
+  public static final String JOB_MR_OUTPUT_FILE = PREFIX_JOB_CONFIG
+      + "mr.output.file";
+
+  public static final String JOB_MR_OUTPUT_CODEC = PREFIX_JOB_CONFIG
+      + "mr.output.codec";
+
 
   private JobConstants() {
     // Disable explicit object creation

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/main/java/org/apache/sqoop/job/etl/EtlContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/etl/EtlContext.java 
b/core/src/main/java/org/apache/sqoop/job/etl/EtlContext.java
index da38d59..09eca58 100644
--- a/core/src/main/java/org/apache/sqoop/job/etl/EtlContext.java
+++ b/core/src/main/java/org/apache/sqoop/job/etl/EtlContext.java
@@ -18,8 +18,6 @@
 package org.apache.sqoop.job.etl;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.sqoop.job.JobConstants;
 
 /**
  * An immutable context used in the ETL framework
@@ -33,14 +31,13 @@ public class EtlContext implements Context {
     this.conf = conf;
   }
 
-  @Override
-  public String getString(String key) {
-    return conf.get(key);
+  protected Configuration getConfiguration() {
+    return conf;
   }
 
   @Override
-  public String[] getFieldNames() {
-    return StringUtils.getStrings(getString(JobConstants.JOB_ETL_FIELD_NAMES));
+  public String getString(String key) {
+    return conf.get(key);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/main/java/org/apache/sqoop/job/etl/EtlMutableContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/etl/EtlMutableContext.java 
b/core/src/main/java/org/apache/sqoop/job/etl/EtlMutableContext.java
index 613b14f..e111956 100644
--- a/core/src/main/java/org/apache/sqoop/job/etl/EtlMutableContext.java
+++ b/core/src/main/java/org/apache/sqoop/job/etl/EtlMutableContext.java
@@ -18,10 +18,8 @@
 package org.apache.sqoop.job.etl;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.core.CoreError;
-import org.apache.sqoop.job.JobConstants;
 
 /**
  * A mutable context used in the ETL framework.
@@ -42,9 +40,4 @@ public class EtlMutableContext extends EtlContext implements 
MutableContext  {
     conf.set(key, value);
   }
 
-  @Override
-  public void setFieldNames(String[] names) {
-    setString(JobConstants.JOB_ETL_FIELD_NAMES, 
StringUtils.arrayToString(names));
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java 
b/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
new file mode 100644
index 0000000..ad513e1
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataReader;
+import org.apache.sqoop.utils.ClassLoadingUtils;
+
+public class HdfsSequenceImportLoader extends Loader {
+
+  public static final String extension = ".seq";
+
+  private final char fieldDelimiter;
+  private final char recordDelimiter;
+
+  public HdfsSequenceImportLoader() {
+    fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
+    recordDelimiter = Data.DEFAULT_RECORD_DELIMITER;
+  }
+
+  @Override
+  public void run(Context context, DataReader reader) {
+    Configuration conf = ((EtlContext)context).getConfiguration();
+    String filename =
+        context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
+    String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC);
+
+    CompressionCodec codec = null;
+    if (codecname != null) {
+      Class<?> clz = ClassLoadingUtils.loadClass(codecname);
+      if (clz == null) {
+        throw new SqoopException(CoreError.CORE_0009, codecname);
+      }
+
+      try {
+        codec = (CompressionCodec) clz.newInstance();
+        if (codec instanceof Configurable) {
+          ((Configurable) codec).setConf(conf);
+        }
+      } catch (Exception e) {
+        throw new SqoopException(CoreError.CORE_0010, codecname, e);
+      }
+    }
+
+    filename += extension;
+
+    try {
+      Path filepath = new Path(filename);
+      SequenceFile.Writer filewriter;
+      if (codecname != null) {
+        filewriter = SequenceFile.createWriter(conf,
+            SequenceFile.Writer.file(filepath),
+            SequenceFile.Writer.keyClass(Text.class),
+            SequenceFile.Writer.valueClass(NullWritable.class),
+            SequenceFile.Writer.compression(CompressionType.BLOCK, codec));
+      } else {
+        filewriter = SequenceFile.createWriter(conf,
+          SequenceFile.Writer.file(filepath),
+          SequenceFile.Writer.keyClass(Text.class),
+          SequenceFile.Writer.valueClass(NullWritable.class),
+          SequenceFile.Writer.compression(CompressionType.NONE));
+      }
+
+      Object record;
+      Text text = new Text();
+      while ((record = reader.readRecord()) != null) {
+        text.set(Data.format(record, fieldDelimiter, recordDelimiter));
+        filewriter.append(text, NullWritable.get());
+      };
+      filewriter.close();
+
+    } catch (IOException e) {
+      throw new SqoopException(CoreError.CORE_0018, e);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java 
b/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
new file mode 100644
index 0000000..1368a5e
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataReader;
+import org.apache.sqoop.utils.ClassLoadingUtils;
+
+public class HdfsTextImportLoader extends Loader {
+
+  private final char fieldDelimiter;
+  private final char recordDelimiter;
+
+  public HdfsTextImportLoader() {
+    fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
+    recordDelimiter = Data.DEFAULT_RECORD_DELIMITER;
+  }
+
+  @Override
+  public void run(Context context, DataReader reader) {
+    Configuration conf = ((EtlContext)context).getConfiguration();
+    String filename = context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
+    String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC);
+
+    CompressionCodec codec = null;
+    if (codecname != null) {
+      Class<?> clz = ClassLoadingUtils.loadClass(codecname);
+      if (clz == null) {
+        throw new SqoopException(CoreError.CORE_0009, codecname);
+      }
+
+      try {
+        codec = (CompressionCodec) clz.newInstance();
+        if (codec instanceof Configurable) {
+          ((Configurable) codec).setConf(conf);
+        }
+      } catch (Exception e) {
+        throw new SqoopException(CoreError.CORE_0010, codecname, e);
+      }
+
+      filename += codec.getDefaultExtension();
+    }
+
+    try {
+      Path filepath = new Path(filename);
+      FileSystem fs = filepath.getFileSystem(conf);
+
+      BufferedWriter filewriter;
+      DataOutputStream filestream = fs.create(filepath, false);
+      if (codecname != null) {
+        filewriter = new BufferedWriter(new OutputStreamWriter(
+            codec.createOutputStream(filestream, codec.createCompressor()),
+            Data.CHARSET_NAME));
+      } else {
+        filewriter = new BufferedWriter(new OutputStreamWriter(
+            filestream, Data.CHARSET_NAME));
+      }
+
+      Object record;
+      while ((record = reader.readRecord()) != null) {
+        filewriter.write(Data.format(record, fieldDelimiter, recordDelimiter));
+      };
+      filewriter.close();
+
+    } catch (IOException e) {
+      throw new SqoopException(CoreError.CORE_0018, e);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/main/java/org/apache/sqoop/job/io/Data.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/io/Data.java 
b/core/src/main/java/org/apache/sqoop/job/io/Data.java
index ce5e2aa..2732e83 100644
--- a/core/src/main/java/org/apache/sqoop/job/io/Data.java
+++ b/core/src/main/java/org/apache/sqoop/job/io/Data.java
@@ -38,13 +38,14 @@ public class Data implements WritableComparable<Data> {
   // - String for a text of CSV record
   private Object content = null;
 
-  private static final int EMPTY_DATA = 0;
-  private static final int CSV_RECORD = 1;
-  private static final int ARRAY_RECORD = 2;
+  public static final int EMPTY_DATA = 0;
+  public static final int CSV_RECORD = 1;
+  public static final int ARRAY_RECORD = 2;
   private int type = EMPTY_DATA;
 
-  private static char FIELD_DELIMITER = ',';
-  private static char RECORD_DELIMITER = '\n';
+  public static final char DEFAULT_FIELD_DELIMITER = ',';
+  public static final char DEFAULT_RECORD_DELIMITER = '\n';
+  public static final String CHARSET_NAME = "UTF-8";
 
   public void setContent(Object content) {
     if (content == null) {
@@ -72,10 +73,46 @@ public class Data implements WritableComparable<Data> {
     return (type == EMPTY_DATA);
   }
 
+  public static String format(Object content,
+      char fieldDelimiter, char recordDelimiter) {
+    if (content instanceof String) {
+      return (String)content + recordDelimiter;
+
+    } else if (content instanceof Object[]) {
+      StringBuilder sb = new StringBuilder();
+      Object[] array = (Object[])content;
+      for (int i = 0; i < array.length; i++) {
+        if (i != 0) {
+          sb.append(fieldDelimiter);
+        }
+
+        if (array[i] instanceof String) {
+          // TODO: Also need to escape those special characters as documented 
in:
+          // 
https://cwiki.apache.org/confluence/display/SQOOP/Sqoop2+Intermediate+representation#Sqoop2Intermediaterepresentation-Intermediateformatrepresentationproposal
+          sb.append("\'");
+          sb.append(((String)array[i]).replaceAll(
+              "\'", Matcher.quoteReplacement("\\\'")));
+          sb.append("\'");
+        } else if (array[i] instanceof byte[]) {
+          sb.append(Arrays.toString((byte[])array[i]));
+        } else {
+          sb.append(array[i].toString());
+        }
+      }
+      sb.append(recordDelimiter);
+      return sb.toString();
+
+    } else {
+      throw new SqoopException(CoreError.CORE_0012,
+          content.getClass().getName());
+    }
+  }
+
   @Override
   public int compareTo(Data other) {
-    byte[] myBytes = toString().getBytes(Charset.forName("UTF-8"));
-    byte[] otherBytes = other.toString().getBytes(Charset.forName("UTF-8"));
+    byte[] myBytes = toString().getBytes(Charset.forName(CHARSET_NAME));
+    byte[] otherBytes = other.toString().getBytes(
+        Charset.forName(CHARSET_NAME));
     return WritableComparator.compareBytes(
         myBytes, 0, myBytes.length, otherBytes, 0, otherBytes.length);
   }
@@ -114,33 +151,7 @@ public class Data implements WritableComparable<Data> {
 
   @Override
   public String toString() {
-    switch (type) {
-    case CSV_RECORD:
-      return (String)content + RECORD_DELIMITER;
-    case ARRAY_RECORD:
-      StringBuilder sb = new StringBuilder();
-      Object[] array = (Object[])content;
-      for (int i = 0; i < array.length; i++) {
-        if (i != 0) {
-          sb.append(FIELD_DELIMITER);
-        }
-
-        if (array[i] instanceof String) {
-          sb.append("\'");
-          sb.append(((String)array[i]).replaceAll(
-              "\'", Matcher.quoteReplacement("\\\'")));
-          sb.append("\'");
-        } else if (array[i] instanceof byte[]) {
-          sb.append(Arrays.toString((byte[])array[i]));
-        } else {
-          sb.append(array[i].toString());
-        }
-      }
-      sb.append(RECORD_DELIMITER);
-      return sb.toString();
-    default:
-      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
-    }
+    return format(content, DEFAULT_FIELD_DELIMITER, DEFAULT_RECORD_DELIMITER);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java 
b/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
index f4d30f6..c465f10 100644
--- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
+++ b/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
@@ -18,12 +18,19 @@
 
 package org.apache.sqoop.job.mr;
 
+import java.io.IOException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.io.Data;
 
 /**
@@ -35,9 +42,25 @@ public class SqoopFileOutputFormat
   public static final Log LOG =
       LogFactory.getLog(SqoopFileOutputFormat.class.getName());
 
+  public static final Class<? extends CompressionCodec> DEFAULT_CODEC =
+      DefaultCodec.class;
+
   @Override
   public RecordWriter<Data, NullWritable> getRecordWriter(
-      TaskAttemptContext context) {
+      TaskAttemptContext context) throws IOException {
+    Configuration conf = context.getConfiguration();
+
+    Path filepath = getDefaultWorkFile(context, "");
+    String filename = filepath.toString();
+    conf.set(JobConstants.JOB_MR_OUTPUT_FILE, filename);
+
+    boolean isCompressed = getCompressOutput(context);
+    if (isCompressed) {
+      String codecname =
+          conf.get(FileOutputFormat.COMPRESS_CODEC, DEFAULT_CODEC.getName());
+      conf.set(JobConstants.JOB_MR_OUTPUT_CODEC, codecname);
+    }
+
     SqoopOutputFormatLoadExecutor executor =
         new SqoopOutputFormatLoadExecutor(context);
     return executor.getRecordWriter();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java 
b/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 8f1e9a9..eb02271 100644
--- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -80,20 +80,21 @@ public class SqoopMapper
 
     @Override
     public void writeArrayRecord(Object[] record) {
-      writeContent(record);
+      writeRecord(record);
     }
 
     @Override
     public void writeCsvRecord(String csv) {
-      writeContent(csv);
+      writeRecord(csv);
     }
 
-    private void writeContent(Object content) {
+    @Override
+    public void writeRecord(Object record) {
       if (data == null) {
         data = new Data();
       }
 
-      data.setContent(content);
+      data.setContent(record);
       try {
         context.write(data, NullWritable.get());
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java 
b/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index 65a8cdf..71e76ca 100644
--- 
a/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ 
b/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -127,15 +127,16 @@ public class SqoopOutputFormatLoadExecutor {
   public class OutputFormatDataReader extends DataReader {
     @Override
     public Object[] readArrayRecord() {
-      return (Object[])readContent();
+      return (Object[])readRecord();
     }
 
     @Override
     public String readCsvRecord() {
-      return (String)readContent();
+      return (String)readRecord();
     }
 
-    private Object readContent() {
+    @Override
+    public Object readRecord() {
       synchronized (data) {
         if (writerFinished) {
           return null;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/test/java/org/apache/sqoop/job/FileUtils.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/job/FileUtils.java 
b/core/src/test/java/org/apache/sqoop/job/FileUtils.java
new file mode 100644
index 0000000..4b075d2
--- /dev/null
+++ b/core/src/test/java/org/apache/sqoop/job/FileUtils.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class FileUtils {
+
+  public static void delete(String file) throws IOException {
+    Path path = new Path(file);
+    FileSystem fs = path.getFileSystem(new Configuration());
+    if (fs.exists(path)) {
+      fs.delete(path, true);
+    }
+  }
+
+  private FileUtils() {
+    // Disable explicit object creation
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/test/java/org/apache/sqoop/job/JobUtils.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/job/JobUtils.java 
b/core/src/test/java/org/apache/sqoop/job/JobUtils.java
new file mode 100644
index 0000000..e6ead3f
--- /dev/null
+++ b/core/src/test/java/org/apache/sqoop/job/JobUtils.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
+import org.apache.sqoop.job.mr.SqoopInputFormat;
+import org.apache.sqoop.job.mr.SqoopMapper;
+import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
+import org.apache.sqoop.job.mr.SqoopSplit;
+
+public class JobUtils {
+
+  public static void runJob(Configuration conf)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    runJob(conf, SqoopInputFormat.class, SqoopMapper.class,
+        (conf.get(FileOutputFormat.OUTDIR) != null) ?
+        SqoopFileOutputFormat.class : SqoopNullOutputFormat.class);
+  }
+
+  public static void runJob(Configuration conf,
+      Class<? extends InputFormat<SqoopSplit, NullWritable>> input,
+      Class<? extends Mapper<SqoopSplit, NullWritable, Data, NullWritable>> 
mapper,
+      Class<? extends OutputFormat<Data, NullWritable>> output)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    Job job = Job.getInstance(conf);
+    job.setInputFormatClass(input);
+    job.setMapperClass(mapper);
+    job.setMapOutputKeyClass(Data.class);
+    job.setMapOutputValueClass(NullWritable.class);
+    job.setOutputFormatClass(output);
+    job.setOutputKeyClass(Data.class);
+    job.setOutputValueClass(NullWritable.class);
+
+    boolean success = job.waitForCompletion(true);
+    Assert.assertEquals("Job failed!", true, success);
+  }
+
+  private JobUtils() {
+    // Disable explicit object creation
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java 
b/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
new file mode 100644
index 0000000..ab05c8e
--- /dev/null
+++ b/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import java.io.BufferedReader;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.LinkedList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.sqoop.job.etl.Context;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
+import org.apache.sqoop.job.etl.HdfsTextImportLoader;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataWriter;
+import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
+import org.junit.Test;
+
+public class TestHdfsLoad extends TestCase {
+
+  private static final String OUTPUT_ROOT = "/tmp/sqoop/warehouse/";
+  private static final String OUTPUT_FILE = "part-r-00000";
+  private static final int START_ID = 1;
+  private static final int NUMBER_OF_IDS = 9;
+  private static final int NUMBER_OF_ROWS_PER_ID = 10;
+
+  private String outdir;
+
+  public TestHdfsLoad() {
+    outdir = OUTPUT_ROOT + "/" + getClass().getSimpleName();
+  }
+
+  @Test
+  public void testUncompressedText() throws Exception {
+    FileUtils.delete(outdir);
+
+    Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_ETL_PARTITIONER, 
DummyPartitioner.class.getName());
+    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+    conf.set(JobConstants.JOB_ETL_LOADER, 
HdfsTextImportLoader.class.getName());
+    conf.set(FileOutputFormat.OUTDIR, outdir);
+    JobUtils.runJob(conf);
+
+    Path filepath = new Path(outdir, OUTPUT_FILE);
+    FileSystem fs = filepath.getFileSystem(conf);
+    DataInputStream filestream = new DataInputStream(fs.open(filepath));
+    BufferedReader filereader = new BufferedReader(new InputStreamReader(
+        filestream, Data.CHARSET_NAME));
+    verifyOutputText(filereader);
+  }
+
+  @Test
+  public void testCompressedText() throws Exception {
+    FileUtils.delete(outdir);
+
+    Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_ETL_PARTITIONER, 
DummyPartitioner.class.getName());
+    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+    conf.set(JobConstants.JOB_ETL_LOADER, 
HdfsTextImportLoader.class.getName());
+    conf.set(FileOutputFormat.OUTDIR, outdir);
+    conf.setBoolean(FileOutputFormat.COMPRESS, true);
+    JobUtils.runJob(conf);
+
+    Class<? extends CompressionCodec> codecClass = conf.getClass(
+        FileOutputFormat.COMPRESS_CODEC, SqoopFileOutputFormat.DEFAULT_CODEC)
+        .asSubclass(CompressionCodec.class);
+    CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
+    Path filepath = new Path(outdir,
+        OUTPUT_FILE + codec.getDefaultExtension());
+    FileSystem fs = filepath.getFileSystem(conf);
+    InputStream filestream = codec.createInputStream(fs.open(filepath));
+    BufferedReader filereader = new BufferedReader(new InputStreamReader(
+        filestream, Data.CHARSET_NAME));
+    verifyOutputText(filereader);
+  }
+
+  private void verifyOutputText(BufferedReader reader) throws IOException {
+    String line = null;
+    int index = START_ID*NUMBER_OF_ROWS_PER_ID;
+    String expected;
+    while ((line = reader.readLine()) != null){
+      expected = Data.format(
+          new Object[] {String.valueOf(index), new Integer(index), new 
Double(index)},
+          Data.DEFAULT_FIELD_DELIMITER, Data.DEFAULT_RECORD_DELIMITER);
+      index++;
+
+      assertEquals(expected.toString(),
+          line + Data.DEFAULT_RECORD_DELIMITER);
+    }
+    reader.close();
+
+    assertEquals(NUMBER_OF_IDS*NUMBER_OF_ROWS_PER_ID,
+        index-START_ID*NUMBER_OF_ROWS_PER_ID);
+  }
+
+  @Test
+  public void testUncompressedSequence() throws Exception {
+    FileUtils.delete(outdir);
+
+    Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_ETL_PARTITIONER, 
DummyPartitioner.class.getName());
+    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+    conf.set(JobConstants.JOB_ETL_LOADER, 
HdfsSequenceImportLoader.class.getName());
+    conf.set(FileOutputFormat.OUTDIR, outdir);
+    JobUtils.runJob(conf);
+
+    Path filepath = new Path(outdir,
+        OUTPUT_FILE + HdfsSequenceImportLoader.extension);
+    SequenceFile.Reader filereader = new SequenceFile.Reader(conf,
+        SequenceFile.Reader.file(filepath));
+    verifyOutputSequence(filereader);
+  }
+
+  @Test
+  public void testCompressedSequence() throws Exception {
+    FileUtils.delete(outdir);
+
+    Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_ETL_PARTITIONER, 
DummyPartitioner.class.getName());
+    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+    conf.set(JobConstants.JOB_ETL_LOADER, 
HdfsSequenceImportLoader.class.getName());
+    conf.set(FileOutputFormat.OUTDIR, outdir);
+    conf.setBoolean(FileOutputFormat.COMPRESS, true);
+    JobUtils.runJob(conf);
+
+    Path filepath = new Path(outdir,
+        OUTPUT_FILE + HdfsSequenceImportLoader.extension);
+    SequenceFile.Reader filereader = new SequenceFile.Reader(conf,
+        SequenceFile.Reader.file(filepath));
+    verifyOutputSequence(filereader);
+  }
+
+  private void verifyOutputSequence(SequenceFile.Reader reader) throws 
IOException {
+    int index = START_ID*NUMBER_OF_ROWS_PER_ID;
+    Text expected = new Text();
+    Text actual = new Text();
+    while (reader.next(actual)){
+      expected.set(Data.format(
+          new Object[] {String.valueOf(index), new Integer(index), new 
Double(index)},
+          Data.DEFAULT_FIELD_DELIMITER, Data.DEFAULT_RECORD_DELIMITER));
+      index++;
+
+      assertEquals(expected.toString(), actual.toString());
+    }
+    reader.close();
+
+    assertEquals(NUMBER_OF_IDS*NUMBER_OF_ROWS_PER_ID,
+        index-START_ID*NUMBER_OF_ROWS_PER_ID);
+  }
+
+  public static class DummyPartition extends Partition {
+    private int id;
+
+    public void setId(int id) {
+      this.id = id;
+    }
+
+    public int getId() {
+      return id;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      id = in.readInt();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(id);
+    }
+  }
+
+  public static class DummyPartitioner extends Partitioner {
+    @Override
+    public List<Partition> run(Context context) {
+      List<Partition> partitions = new LinkedList<Partition>();
+      for (int id = START_ID; id <= NUMBER_OF_IDS; id++) {
+        DummyPartition partition = new DummyPartition();
+        partition.setId(id);
+        partitions.add(partition);
+      }
+      return partitions;
+    }
+  }
+
+  public static class DummyExtractor extends Extractor {
+    @Override
+    public void run(Context context, Partition partition, DataWriter writer) {
+      int id = ((DummyPartition)partition).getId();
+      for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) {
+        Object[] array = new Object[] {
+          String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row),
+          new Integer(id*NUMBER_OF_ROWS_PER_ID+row),
+          new Double(id*NUMBER_OF_ROWS_PER_ID+row)
+        };
+        writer.writeArrayRecord(array);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java 
b/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index 8158e46..f4701db 100644
--- a/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 
-import junit.framework.Assert;
+import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
@@ -48,7 +48,7 @@ import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
 import org.apache.sqoop.job.mr.SqoopSplit;
 import org.junit.Test;
 
-public class TestMapReduce {
+public class TestMapReduce extends TestCase {
 
   private static final int START_ID = 1;
   private static final int NUMBER_OF_IDS = 9;
@@ -62,12 +62,12 @@ public class TestMapReduce {
 
     SqoopInputFormat inputformat = new SqoopInputFormat();
     List<InputSplit> splits = inputformat.getSplits(job);
-    Assert.assertEquals(9, splits.size());
+    assertEquals(9, splits.size());
 
     for (int id = START_ID; id <= NUMBER_OF_IDS; id++) {
       SqoopSplit split = (SqoopSplit)splits.get(id-1);
       DummyPartition partition = (DummyPartition)split.getPartition();
-      Assert.assertEquals(id, partition.getId());
+      assertEquals(id, partition.getId());
     }
   }
 
@@ -77,17 +77,8 @@ public class TestMapReduce {
     conf.set(JobConstants.JOB_ETL_PARTITIONER, 
DummyPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
 
-    Job job = Job.getInstance(conf);
-    job.setInputFormatClass(SqoopInputFormat.class);
-    job.setMapperClass(SqoopMapper.class);
-    job.setMapOutputKeyClass(Data.class);
-    job.setMapOutputValueClass(NullWritable.class);
-    job.setOutputFormatClass(DummyOutputFormat.class);
-    job.setOutputKeyClass(Data.class);
-    job.setOutputValueClass(NullWritable.class);
-
-    boolean success = job.waitForCompletion(true);
-    Assert.assertEquals("Job failed!", true, success);
+    JobUtils.runJob(conf, SqoopInputFormat.class, SqoopMapper.class,
+        DummyOutputFormat.class);
   }
 
   @Test
@@ -97,17 +88,8 @@ public class TestMapReduce {
     conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
 
-    Job job = Job.getInstance(conf);
-    job.setInputFormatClass(SqoopInputFormat.class);
-    job.setMapperClass(SqoopMapper.class);
-    job.setMapOutputKeyClass(Data.class);
-    job.setMapOutputValueClass(NullWritable.class);
-    job.setOutputFormatClass(SqoopNullOutputFormat.class);
-    job.setOutputKeyClass(Data.class);
-    job.setOutputValueClass(NullWritable.class);
-
-    boolean success = job.waitForCompletion(true);
-    Assert.assertEquals("Job failed!", true, success);
+    JobUtils.runJob(conf, SqoopInputFormat.class, SqoopMapper.class,
+        SqoopNullOutputFormat.class);
   }
 
   public static class DummyPartition extends Partition {
@@ -150,12 +132,10 @@ public class TestMapReduce {
     public void run(Context context, Partition partition, DataWriter writer) {
       int id = ((DummyPartition)partition).getId();
       for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) {
-        Object[] array = new Object[] {
-          String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row),
-          new Integer(id*NUMBER_OF_ROWS_PER_ID+row),
-          new Double(id*NUMBER_OF_ROWS_PER_ID+row)
-        };
-        writer.writeArrayRecord(array);
+        writer.writeArrayRecord(new Object[] {
+            String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row),
+            new Integer(id*NUMBER_OF_ROWS_PER_ID+row),
+            new Double(id*NUMBER_OF_ROWS_PER_ID+row)});
       }
     }
   }
@@ -185,14 +165,13 @@ public class TestMapReduce {
 
       @Override
       public void write(Data key, NullWritable value) {
-        Object[] record = new Object[] {
+        data.setContent(new Object[] {
           String.valueOf(index),
           new Integer(index),
-          new Double(index)
-        };
-        data.setContent(record);
-        Assert.assertEquals(data.toString(), key.toString());
+          new Double(index)});
         index++;
+
+        assertEquals(data.toString(), key.toString());
       }
 
       @Override
@@ -238,7 +217,7 @@ public class TestMapReduce {
           new Double(index)});
         index++;
 
-        Assert.assertEquals(expected.toString(), actual.toString());
+        assertEquals(expected.toString(), actual.toString());
       };
     }
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/spi/src/main/java/org/apache/sqoop/job/etl/Context.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Context.java 
b/spi/src/main/java/org/apache/sqoop/job/etl/Context.java
index d2c7ebc..7256281 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Context.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Context.java
@@ -24,6 +24,4 @@ public interface Context {
 
   public String getString(String key);
 
-  public String[] getFieldNames();
-
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/spi/src/main/java/org/apache/sqoop/job/etl/MutableContext.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/MutableContext.java 
b/spi/src/main/java/org/apache/sqoop/job/etl/MutableContext.java
index c321d74..2bfe89f 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/MutableContext.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/MutableContext.java
@@ -24,6 +24,4 @@ public interface MutableContext extends Context {
 
   public void setString(String key, String value);
 
-  public void setFieldNames(String[] names);
-
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java 
b/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java
index 6b90cda..b9b2f49 100644
--- a/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java
+++ b/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java
@@ -27,4 +27,6 @@ public abstract class DataReader {
 
   public abstract String readCsvRecord();
 
+  public abstract Object readRecord();
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java 
b/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java
index ca55b2e..29c4283 100644
--- a/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java
+++ b/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java
@@ -27,4 +27,6 @@ public abstract class DataWriter {
 
   public abstract void writeCsvRecord(String csv);
 
+  public abstract void writeRecord(Object record);
+
 }

Reply via email to