Repository: sqoop
Updated Branches:
  refs/heads/trunk e23d1571b -> 6b1925477


SQOOP-1492: Move Avro and GenericRecord related common code to AvroUtil

(Qian Xu via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/6b192547
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/6b192547
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/6b192547

Branch: refs/heads/trunk
Commit: 6b1925477d434f6104ede98c13eac2b89472dd8d
Parents: e23d157
Author: Jarek Jarcec Cecho <[email protected]>
Authored: Fri Sep 5 09:13:39 2014 +0200
Committer: Jarek Jarcec Cecho <[email protected]>
Committed: Fri Sep 5 09:13:39 2014 +0200

----------------------------------------------------------------------
 src/java/org/apache/sqoop/avro/AvroUtil.java    |  81 +++++++++
 .../sqoop/mapreduce/AvroExportMapper.java       | 172 +------------------
 .../mapreduce/GenericRecordExportMapper.java    | 110 ++++++++++++
 3 files changed, 194 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/6b192547/src/java/org/apache/sqoop/avro/AvroUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/avro/AvroUtil.java 
b/src/java/org/apache/sqoop/avro/AvroUtil.java
index 811c240..2fdf263 100644
--- a/src/java/org/apache/sqoop/avro/AvroUtil.java
+++ b/src/java/org/apache/sqoop/avro/AvroUtil.java
@@ -19,6 +19,7 @@ package org.apache.sqoop.avro;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.sqoop.lib.BlobRef;
@@ -29,6 +30,7 @@ import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -82,4 +84,83 @@ public final class AvroUtil {
     return record;
   }
 
+  private static final String TIMESTAMP_TYPE = "java.sql.Timestamp";
+  private static final String TIME_TYPE = "java.sql.Time";
+  private static final String DATE_TYPE = "java.sql.Date";
+  private static final String BIG_DECIMAL_TYPE = "java.math.BigDecimal";
+  private static final String BLOB_REF_TYPE = "com.cloudera.sqoop.lib.BlobRef";
+
+  /**
+   * Convert from Avro type to Sqoop's java representation of the SQL type
+   * see SqlManager#toJavaType
+   */
+  public static Object fromAvro(Object avroObject, Schema schema, String type) 
{
+    if (avroObject == null) {
+      return null;
+    }
+
+    switch (schema.getType()) {
+      case NULL:
+        return null;
+      case BOOLEAN:
+      case INT:
+      case FLOAT:
+      case DOUBLE:
+        return avroObject;
+      case LONG:
+        if (type.equals(DATE_TYPE)) {
+          return new Date((Long) avroObject);
+        } else if (type.equals(TIME_TYPE)) {
+          return new Time((Long) avroObject);
+        } else if (type.equals(TIMESTAMP_TYPE)) {
+          return new Timestamp((Long) avroObject);
+        }
+        return avroObject;
+      case BYTES:
+        ByteBuffer bb = (ByteBuffer) avroObject;
+        BytesWritable bw = new BytesWritable();
+        bw.set(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining());
+        if (type.equals(BLOB_REF_TYPE)) {
+          // TODO: Should convert BytesWritable to BlobRef properly. 
(SQOOP-991)
+          throw new UnsupportedOperationException("BlobRef not supported");
+        }
+        return bw;
+      case STRING:
+        if (type.equals(BIG_DECIMAL_TYPE)) {
+          return new BigDecimal(avroObject.toString());
+        } else if (type.equals(DATE_TYPE)) {
+          return Date.valueOf(avroObject.toString());
+        } else if (type.equals(TIME_TYPE)) {
+          return Time.valueOf(avroObject.toString());
+        } else if (type.equals(TIMESTAMP_TYPE)) {
+          return Timestamp.valueOf(avroObject.toString());
+        }
+        return avroObject.toString();
+      case ENUM:
+        return avroObject.toString();
+      case UNION:
+        List<Schema> types = schema.getTypes();
+        if (types.size() != 2) {
+          throw new IllegalArgumentException("Only support union with null");
+        }
+        Schema s1 = types.get(0);
+        Schema s2 = types.get(1);
+        if (s1.getType() == Schema.Type.NULL) {
+          return fromAvro(avroObject, s2, type);
+        } else if (s2.getType() == Schema.Type.NULL) {
+          return fromAvro(avroObject, s1, type);
+        } else {
+          throw new IllegalArgumentException("Only support union with null");
+        }
+      case FIXED:
+        return new BytesWritable(((GenericFixed) avroObject).bytes());
+      case RECORD:
+      case ARRAY:
+      case MAP:
+      default:
+        throw new IllegalArgumentException("Cannot convert Avro type "
+            + schema.getType());
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6b192547/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java 
b/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java
index 1f0cb6a..20f056a 100644
--- a/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java
@@ -18,85 +18,17 @@
 
 package org.apache.sqoop.mapreduce;
 
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.List;
-import java.util.Map;
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.generic.GenericEnumSymbol;
-import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.AvroWrapper;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DefaultStringifier;
-import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-import com.cloudera.sqoop.lib.SqoopRecord;
-import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
-import com.cloudera.sqoop.orm.ClassWriter;
+
+import java.io.IOException;
 
 /**
  * Exports records from an Avro data file.
  */
 public class AvroExportMapper
-    extends AutoProgressMapper<AvroWrapper<GenericRecord>, NullWritable,
-              SqoopRecord, NullWritable> {
-
-  private static final String TIMESTAMP_TYPE = "java.sql.Timestamp";
-
-  private static final String TIME_TYPE = "java.sql.Time";
-
-  private static final String DATE_TYPE = "java.sql.Date";
-
-  private static final String BIG_DECIMAL_TYPE = "java.math.BigDecimal";
-
-  public static final String AVRO_COLUMN_TYPES_MAP =
-      "sqoop.avro.column.types.map";
-
-  private MapWritable columnTypes;
-  private SqoopRecord recordImpl;
-
-  @Override
-  protected void setup(Context context)
-      throws IOException, InterruptedException {
-
-    super.setup(context);
-
-    Configuration conf = context.getConfiguration();
-
-    // Instantiate a copy of the user's class to hold and parse the record.
-    String recordClassName = conf.get(
-        ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
-    if (null == recordClassName) {
-      throw new IOException("Export table class name ("
-          + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
-          + ") is not set!");
-    }
-
-    try {
-      Class cls = Class.forName(recordClassName, true,
-          Thread.currentThread().getContextClassLoader());
-      recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
-    } catch (ClassNotFoundException cnfe) {
-      throw new IOException(cnfe);
-    }
-
-    if (null == recordImpl) {
-      throw new IOException("Could not instantiate object of type "
-          + recordClassName);
-    }
-
-    columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP,
-        MapWritable.class);
-  }
+    extends GenericRecordExportMapper<AvroWrapper<GenericRecord>, 
NullWritable> {
 
   @Override
   protected void map(AvroWrapper<GenericRecord> key, NullWritable value,
@@ -104,102 +36,4 @@ public class AvroExportMapper
     context.write(toSqoopRecord(key.datum()), NullWritable.get());
   }
 
-  private SqoopRecord toSqoopRecord(GenericRecord record) throws IOException {
-    Schema avroSchema = record.getSchema();
-    for (Map.Entry<Writable, Writable> e : columnTypes.entrySet()) {
-      String columnName = e.getKey().toString();
-      String columnType = e.getValue().toString();
-      String cleanedCol = ClassWriter.toIdentifier(columnName);
-      Field field = getField(avroSchema, cleanedCol, record);
-      if (field == null) {
-        throw new IOException("Cannot find field " + cleanedCol
-          + " in Avro schema " + avroSchema);
-      } else {
-        Object avroObject = record.get(field.name());
-        Object fieldVal = fromAvro(avroObject, field.schema(), columnType);
-        recordImpl.setField(cleanedCol, fieldVal);
-      }
-    }
-    return recordImpl;
-  }
-
-  private Field getField(Schema avroSchema, String fieldName,
-      GenericRecord record) {
-    for (Field field : avroSchema.getFields()) {
-      if (field.name().equalsIgnoreCase(fieldName)) {
-        return field;
-      }
-    }
-    return null;
-  }
-
-  private Object fromAvro(Object avroObject, Schema fieldSchema,
-      String columnType) {
-    // map from Avro type to Sqoop's Java representation of the SQL type
-    // see SqlManager#toJavaType
-
-    if (avroObject == null) {
-      return null;
-    }
-
-    switch (fieldSchema.getType()) {
-      case NULL:
-        return null;
-      case BOOLEAN:
-      case INT:
-      case FLOAT:
-      case DOUBLE:
-        return avroObject;
-      case LONG:
-        if (columnType.equals(DATE_TYPE)) {
-          return new Date((Long) avroObject);
-        } else if (columnType.equals(TIME_TYPE)) {
-          return new Time((Long) avroObject);
-        } else if (columnType.equals(TIMESTAMP_TYPE)) {
-          return new Timestamp((Long) avroObject);
-        }
-        return avroObject;
-      case BYTES:
-        ByteBuffer bb = (ByteBuffer) avroObject;
-        BytesWritable bw = new BytesWritable();
-        bw.set(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining());
-        return bw;
-      case STRING:
-        if (columnType.equals(BIG_DECIMAL_TYPE)) {
-          return new BigDecimal(avroObject.toString());
-        } else if (columnType.equals(DATE_TYPE)) {
-          return Date.valueOf(avroObject.toString());
-        } else if (columnType.equals(TIME_TYPE)) {
-          return Time.valueOf(avroObject.toString());
-        } else if (columnType.equals(TIMESTAMP_TYPE)) {
-          return Timestamp.valueOf(avroObject.toString());
-        }
-        return avroObject.toString();
-      case ENUM:
-        return ((GenericEnumSymbol) avroObject).toString();
-      case UNION:
-        List<Schema> types = fieldSchema.getTypes();
-        if (types.size() != 2) {
-          throw new IllegalArgumentException("Only support union with null");
-        }
-        Schema s1 = types.get(0);
-        Schema s2 = types.get(1);
-        if (s1.getType() == Schema.Type.NULL) {
-          return fromAvro(avroObject, s2, columnType);
-        } else if (s2.getType() == Schema.Type.NULL) {
-          return fromAvro(avroObject, s1, columnType);
-        } else {
-          throw new IllegalArgumentException("Only support union with null");
-        }
-      case FIXED:
-        return new BytesWritable(((GenericFixed) avroObject).bytes());
-      case RECORD:
-      case ARRAY:
-      case MAP:
-      default:
-        throw new IllegalArgumentException("Cannot convert Avro type "
-            + fieldSchema.getType());
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6b192547/src/java/org/apache/sqoop/mapreduce/GenericRecordExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/GenericRecordExportMapper.java 
b/src/java/org/apache/sqoop/mapreduce/GenericRecordExportMapper.java
new file mode 100644
index 0000000..ab263c1
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/GenericRecordExportMapper.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+import com.cloudera.sqoop.orm.ClassWriter;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.sqoop.avro.AvroUtil;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Exports records (type GenericRecord) from a data source.
+ */
+public class GenericRecordExportMapper<K, V>
+    extends AutoProgressMapper<K, V, SqoopRecord, NullWritable> {
+
+  public static final String AVRO_COLUMN_TYPES_MAP = 
"sqoop.avro.column.types.map";
+
+  protected MapWritable columnTypes;
+
+  private SqoopRecord recordImpl;
+
+  @Override
+  protected void setup(Context context) throws IOException, 
InterruptedException {
+    super.setup(context);
+
+    Configuration conf = context.getConfiguration();
+
+    // Instantiate a copy of the user's class to hold and parse the record.
+    String recordClassName = conf.get(
+        ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
+    if (null == recordClassName) {
+      throw new IOException("Export table class name ("
+          + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
+          + ") is not set!");
+    }
+
+    try {
+      Class cls = Class.forName(recordClassName, true,
+          Thread.currentThread().getContextClassLoader());
+      recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException(cnfe);
+    }
+
+    if (null == recordImpl) {
+      throw new IOException("Could not instantiate object of type "
+          + recordClassName);
+    }
+
+    columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP,
+        MapWritable.class);
+  }
+
+  protected SqoopRecord toSqoopRecord(GenericRecord record) throws IOException 
{
+    Schema avroSchema = record.getSchema();
+    for (Map.Entry<Writable, Writable> e : columnTypes.entrySet()) {
+      String columnName = e.getKey().toString();
+      String columnType = e.getValue().toString();
+      String cleanedCol = ClassWriter.toIdentifier(columnName);
+      Schema.Field field = getFieldIgnoreCase(avroSchema, cleanedCol);
+      if (null == field) {
+        throw new IOException("Cannot find field " + cleanedCol
+            + " in Avro schema " + avroSchema);
+      }
+
+      Object avroObject = record.get(field.name());
+      Object fieldVal = AvroUtil.fromAvro(avroObject, field.schema(), 
columnType);
+      recordImpl.setField(cleanedCol, fieldVal);
+    }
+    return recordImpl;
+  }
+
+  private static Schema.Field getFieldIgnoreCase(Schema avroSchema,
+      String fieldName) {
+    for (Schema.Field field : avroSchema.getFields()) {
+      if (field.name().equalsIgnoreCase(fieldName)) {
+        return field;
+      }
+    }
+    return null;
+  }
+
+}

Reply via email to