Repository: sqoop Updated Branches: refs/heads/trunk e23d1571b -> 6b1925477
SQOOP-1492: Move Avro and GenericRecord related common code to AvroUtil (Qian Xu via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/6b192547 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/6b192547 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/6b192547 Branch: refs/heads/trunk Commit: 6b1925477d434f6104ede98c13eac2b89472dd8d Parents: e23d157 Author: Jarek Jarcec Cecho <[email protected]> Authored: Fri Sep 5 09:13:39 2014 +0200 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Fri Sep 5 09:13:39 2014 +0200 ---------------------------------------------------------------------- src/java/org/apache/sqoop/avro/AvroUtil.java | 81 +++++++++ .../sqoop/mapreduce/AvroExportMapper.java | 172 +------------------ .../mapreduce/GenericRecordExportMapper.java | 110 ++++++++++++ 3 files changed, 194 insertions(+), 169 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/6b192547/src/java/org/apache/sqoop/avro/AvroUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/avro/AvroUtil.java b/src/java/org/apache/sqoop/avro/AvroUtil.java index 811c240..2fdf263 100644 --- a/src/java/org/apache/sqoop/avro/AvroUtil.java +++ b/src/java/org/apache/sqoop/avro/AvroUtil.java @@ -19,6 +19,7 @@ package org.apache.sqoop.avro; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.io.BytesWritable; import org.apache.sqoop.lib.BlobRef; @@ -29,6 +30,7 @@ import java.nio.ByteBuffer; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.List; import java.util.Map; /** @@ -82,4 +84,83 @@ public final class AvroUtil { return record; } + private static final String TIMESTAMP_TYPE = "java.sql.Timestamp"; + private static final String TIME_TYPE = "java.sql.Time"; + private static final String DATE_TYPE = "java.sql.Date"; + private static final String BIG_DECIMAL_TYPE = "java.math.BigDecimal"; + private static final String BLOB_REF_TYPE = "com.cloudera.sqoop.lib.BlobRef"; + + /** + * Convert from Avro type to Sqoop's java representation of the SQL type + * see SqlManager#toJavaType + */ + public static Object fromAvro(Object avroObject, Schema schema, String type) { + if (avroObject == null) { + return null; + } + + switch (schema.getType()) { + case NULL: + return null; + case BOOLEAN: + case INT: + case FLOAT: + case DOUBLE: + return avroObject; + case LONG: + if (type.equals(DATE_TYPE)) { + return new Date((Long) avroObject); + } else if (type.equals(TIME_TYPE)) { + return new Time((Long) avroObject); + } else if (type.equals(TIMESTAMP_TYPE)) { + return new Timestamp((Long) avroObject); + } + return avroObject; + case BYTES: + ByteBuffer bb = (ByteBuffer) avroObject; + BytesWritable bw = new BytesWritable(); + bw.set(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining()); + if (type.equals(BLOB_REF_TYPE)) { + // TODO: Should convert BytesWritable to BlobRef properly. (SQOOP-991) + throw new UnsupportedOperationException("BlobRef not supported"); + } + return bw; + case STRING: + if (type.equals(BIG_DECIMAL_TYPE)) { + return new BigDecimal(avroObject.toString()); + } else if (type.equals(DATE_TYPE)) { + return Date.valueOf(avroObject.toString()); + } else if (type.equals(TIME_TYPE)) { + return Time.valueOf(avroObject.toString()); + } else if (type.equals(TIMESTAMP_TYPE)) { + return Timestamp.valueOf(avroObject.toString()); + } + return avroObject.toString(); + case ENUM: + return avroObject.toString(); + case UNION: + List<Schema> types = schema.getTypes(); + if (types.size() != 2) { + throw new IllegalArgumentException("Only support union with null"); + } + Schema s1 = types.get(0); + Schema s2 = types.get(1); + if (s1.getType() == Schema.Type.NULL) { + return fromAvro(avroObject, s2, type); + } else if (s2.getType() == Schema.Type.NULL) { + return fromAvro(avroObject, s1, type); + } else { + throw new IllegalArgumentException("Only support union with null"); + } + case FIXED: + return new BytesWritable(((GenericFixed) avroObject).bytes()); + case RECORD: + case ARRAY: + case MAP: + default: + throw new IllegalArgumentException("Cannot convert Avro type " + + schema.getType()); + } + } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/6b192547/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java b/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java index 1f0cb6a..20f056a 100644 --- a/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java @@ -18,85 +18,17 @@ package org.apache.sqoop.mapreduce; -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.ByteBuffer; -import java.sql.Date; -import java.sql.Time; -import java.sql.Timestamp; -import java.util.List; -import java.util.Map; -import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; -import org.apache.avro.generic.GenericEnumSymbol; -import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; import org.apache.avro.mapred.AvroWrapper; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.DefaultStringifier; -import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.util.ReflectionUtils; -import com.cloudera.sqoop.lib.SqoopRecord; -import com.cloudera.sqoop.mapreduce.AutoProgressMapper; -import com.cloudera.sqoop.orm.ClassWriter; + +import java.io.IOException; /** * Exports records from an Avro data file. */ public class AvroExportMapper - extends AutoProgressMapper<AvroWrapper<GenericRecord>, NullWritable, - SqoopRecord, NullWritable> { - - private static final String TIMESTAMP_TYPE = "java.sql.Timestamp"; - - private static final String TIME_TYPE = "java.sql.Time"; - - private static final String DATE_TYPE = "java.sql.Date"; - - private static final String BIG_DECIMAL_TYPE = "java.math.BigDecimal"; - - public static final String AVRO_COLUMN_TYPES_MAP = - "sqoop.avro.column.types.map"; - - private MapWritable columnTypes; - private SqoopRecord recordImpl; - - @Override - protected void setup(Context context) - throws IOException, InterruptedException { - - super.setup(context); - - Configuration conf = context.getConfiguration(); - - // Instantiate a copy of the user's class to hold and parse the record. - String recordClassName = conf.get( - ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY); - if (null == recordClassName) { - throw new IOException("Export table class name (" - + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY - + ") is not set!"); - } - - try { - Class cls = Class.forName(recordClassName, true, - Thread.currentThread().getContextClassLoader()); - recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf); - } catch (ClassNotFoundException cnfe) { - throw new IOException(cnfe); - } - - if (null == recordImpl) { - throw new IOException("Could not instantiate object of type " - + recordClassName); - } - - columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP, - MapWritable.class); - } + extends GenericRecordExportMapper<AvroWrapper<GenericRecord>, NullWritable> { @Override protected void map(AvroWrapper<GenericRecord> key, NullWritable value, @@ -104,102 +36,4 @@ public class AvroExportMapper context.write(toSqoopRecord(key.datum()), NullWritable.get()); } - private SqoopRecord toSqoopRecord(GenericRecord record) throws IOException { - Schema avroSchema = record.getSchema(); - for (Map.Entry<Writable, Writable> e : columnTypes.entrySet()) { - String columnName = e.getKey().toString(); - String columnType = e.getValue().toString(); - String cleanedCol = ClassWriter.toIdentifier(columnName); - Field field = getField(avroSchema, cleanedCol, record); - if (field == null) { - throw new IOException("Cannot find field " + cleanedCol - + " in Avro schema " + avroSchema); - } else { - Object avroObject = record.get(field.name()); - Object fieldVal = fromAvro(avroObject, field.schema(), columnType); - recordImpl.setField(cleanedCol, fieldVal); - } - } - return recordImpl; - } - - private Field getField(Schema avroSchema, String fieldName, - GenericRecord record) { - for (Field field : avroSchema.getFields()) { - if (field.name().equalsIgnoreCase(fieldName)) { - return field; - } - } - return null; - } - - private Object fromAvro(Object avroObject, Schema fieldSchema, - String columnType) { - // map from Avro type to Sqoop's Java representation of the SQL type - // see SqlManager#toJavaType - - if (avroObject == null) { - return null; - } - - switch (fieldSchema.getType()) { - case NULL: - return null; - case BOOLEAN: - case INT: - case FLOAT: - case DOUBLE: - return avroObject; - case LONG: - if (columnType.equals(DATE_TYPE)) { - return new Date((Long) avroObject); - } else if (columnType.equals(TIME_TYPE)) { - return new Time((Long) avroObject); - } else if (columnType.equals(TIMESTAMP_TYPE)) { - return new Timestamp((Long) avroObject); - } - return avroObject; - case BYTES: - ByteBuffer bb = (ByteBuffer) avroObject; - BytesWritable bw = new BytesWritable(); - bw.set(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining()); - return bw; - case STRING: - if (columnType.equals(BIG_DECIMAL_TYPE)) { - return new BigDecimal(avroObject.toString()); - } else if (columnType.equals(DATE_TYPE)) { - return Date.valueOf(avroObject.toString()); - } else if (columnType.equals(TIME_TYPE)) { - return Time.valueOf(avroObject.toString()); - } else if (columnType.equals(TIMESTAMP_TYPE)) { - return Timestamp.valueOf(avroObject.toString()); - } - return avroObject.toString(); - case ENUM: - return ((GenericEnumSymbol) avroObject).toString(); - case UNION: - List<Schema> types = fieldSchema.getTypes(); - if (types.size() != 2) { - throw new IllegalArgumentException("Only support union with null"); - } - Schema s1 = types.get(0); - Schema s2 = types.get(1); - if (s1.getType() == Schema.Type.NULL) { - return fromAvro(avroObject, s2, columnType); - } else if (s2.getType() == Schema.Type.NULL) { - return fromAvro(avroObject, s1, columnType); - } else { - throw new IllegalArgumentException("Only support union with null"); - } - case FIXED: - return new BytesWritable(((GenericFixed) avroObject).bytes()); - case RECORD: - case ARRAY: - case MAP: - default: - throw new IllegalArgumentException("Cannot convert Avro type " - + fieldSchema.getType()); - } - } - } http://git-wip-us.apache.org/repos/asf/sqoop/blob/6b192547/src/java/org/apache/sqoop/mapreduce/GenericRecordExportMapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/GenericRecordExportMapper.java b/src/java/org/apache/sqoop/mapreduce/GenericRecordExportMapper.java new file mode 100644 index 0000000..ab263c1 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/GenericRecordExportMapper.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce; + +import com.cloudera.sqoop.lib.SqoopRecord; +import com.cloudera.sqoop.mapreduce.AutoProgressMapper; +import com.cloudera.sqoop.orm.ClassWriter; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DefaultStringifier; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.sqoop.avro.AvroUtil; + +import java.io.IOException; +import java.util.Map; + +/** + * Exports records (type GenericRecord) from a data source. + */ +public class GenericRecordExportMapper<K, V> + extends AutoProgressMapper<K, V, SqoopRecord, NullWritable> { + + public static final String AVRO_COLUMN_TYPES_MAP = "sqoop.avro.column.types.map"; + + protected MapWritable columnTypes; + + private SqoopRecord recordImpl; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + + Configuration conf = context.getConfiguration(); + + // Instantiate a copy of the user's class to hold and parse the record. + String recordClassName = conf.get( + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY); + if (null == recordClassName) { + throw new IOException("Export table class name (" + + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY + + ") is not set!"); + } + + try { + Class cls = Class.forName(recordClassName, true, + Thread.currentThread().getContextClassLoader()); + recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf); + } catch (ClassNotFoundException cnfe) { + throw new IOException(cnfe); + } + + if (null == recordImpl) { + throw new IOException("Could not instantiate object of type " + + recordClassName); + } + + columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP, + MapWritable.class); + } + + protected SqoopRecord toSqoopRecord(GenericRecord record) throws IOException { + Schema avroSchema = record.getSchema(); + for (Map.Entry<Writable, Writable> e : columnTypes.entrySet()) { + String columnName = e.getKey().toString(); + String columnType = e.getValue().toString(); + String cleanedCol = ClassWriter.toIdentifier(columnName); + Schema.Field field = getFieldIgnoreCase(avroSchema, cleanedCol); + if (null == field) { + throw new IOException("Cannot find field " + cleanedCol + + " in Avro schema " + avroSchema); + } + + Object avroObject = record.get(field.name()); + Object fieldVal = AvroUtil.fromAvro(avroObject, field.schema(), columnType); + recordImpl.setField(cleanedCol, fieldVal); + } + return recordImpl; + } + + private static Schema.Field getFieldIgnoreCase(Schema avroSchema, + String fieldName) { + for (Schema.Field field : avroSchema.getFields()) { + if (field.name().equalsIgnoreCase(fieldName)) { + return field; + } + } + return null; + } + +}
