Repository: sqoop
Updated Branches:
  refs/heads/trunk 500b75ae4 -> 41724c31d


SQOOP-1491: Remove SqoopAvroRecord

(Qian Xu via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/41724c31
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/41724c31
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/41724c31

Branch: refs/heads/trunk
Commit: 41724c31d4c9e56fc7b979b95707bf9ed2e752c1
Parents: 500b75a
Author: Jarek Jarcec Cecho <[email protected]>
Authored: Fri Sep 5 08:13:14 2014 +0200
Committer: Jarek Jarcec Cecho <[email protected]>
Committed: Fri Sep 5 08:13:14 2014 +0200

----------------------------------------------------------------------
 src/java/org/apache/sqoop/avro/AvroUtil.java    | 24 +++++++--
 .../org/apache/sqoop/lib/SqoopAvroRecord.java   | 57 --------------------
 .../sqoop/mapreduce/AvroImportMapper.java       | 27 ++++------
 .../sqoop/mapreduce/DataDrivenImportJob.java    | 19 ++++---
 .../sqoop/mapreduce/ParquetImportMapper.java    | 22 +++++---
 .../org/apache/sqoop/mapreduce/ParquetJob.java  |  7 +++
 src/java/org/apache/sqoop/orm/ClassWriter.java  | 57 ++------------------
 7 files changed, 69 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/41724c31/src/java/org/apache/sqoop/avro/AvroUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/avro/AvroUtil.java 
b/src/java/org/apache/sqoop/avro/AvroUtil.java
index 4b37d58..811c240 100644
--- a/src/java/org/apache/sqoop/avro/AvroUtil.java
+++ b/src/java/org/apache/sqoop/avro/AvroUtil.java
@@ -17,6 +17,9 @@
  */
 package org.apache.sqoop.avro;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.sqoop.lib.BlobRef;
 import org.apache.sqoop.lib.ClobRef;
@@ -26,6 +29,7 @@ import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.Map;
 
 /**
  * The service class provides methods for creating and converting Avro objects.
@@ -33,14 +37,13 @@ import java.sql.Timestamp;
 public final class AvroUtil {
 
   /**
-   * Convert the Avro representation of a Java type (that has already been
-   * converted from the SQL equivalent). Note that the method is taken from
-   * {@link org.apache.sqoop.mapreduce.AvroImportMapper}
+   * Convert a Sqoop's Java representation to Avro representation.
    */
   public static Object toAvro(Object o, boolean bigDecimalFormatString) {
     if (o instanceof BigDecimal) {
       if (bigDecimalFormatString) {
-        return ((BigDecimal)o).toPlainString();
+        // Returns a string representation of this without an exponent field.
+        return ((BigDecimal) o).toPlainString();
       } else {
         return o.toString();
       }
@@ -66,4 +69,17 @@ public final class AvroUtil {
     return o;
   }
 
+  /**
+   * Manipulate a GenericRecord instance.
+   */
+  public static GenericRecord toGenericRecord(Map<String, Object> fieldMap,
+      Schema schema, boolean bigDecimalFormatString) {
+    GenericRecord record = new GenericData.Record(schema);
+    for (Map.Entry<String, Object> entry : fieldMap.entrySet()) {
+      Object avroObject = toAvro(entry.getValue(), bigDecimalFormatString);
+      record.put(entry.getKey(), avroObject);
+    }
+    return record;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/41724c31/src/java/org/apache/sqoop/lib/SqoopAvroRecord.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/lib/SqoopAvroRecord.java 
b/src/java/org/apache/sqoop/lib/SqoopAvroRecord.java
deleted file mode 100644
index 80875d2..0000000
--- a/src/java/org/apache/sqoop/lib/SqoopAvroRecord.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.lib;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.sqoop.avro.AvroUtil;
-
-/**
- * The abstract class extends {@link org.apache.sqoop.lib.SqoopRecord}. It also
- * implements the interface GenericRecord which is a generic instance of an 
Avro
- * record schema. Fields are accessible by name as well as by index.
- */
-public abstract class SqoopAvroRecord extends SqoopRecord implements 
GenericRecord {
-
-  public abstract boolean getBigDecimalFormatString();
-
-  @Override
-  public void put(String key, Object v) {
-    getFieldMap().put(key, v);
-  }
-
-  @Override
-  public Object get(String key) {
-    Object o = getFieldMap().get(key);
-    return AvroUtil.toAvro(o, getBigDecimalFormatString());
-  }
-
-  @Override
-  public void put(int i, Object v) {
-    put(getFieldNameByIndex(i), v);
-  }
-
-  @Override
-  public Object get(int i) {
-    return get(getFieldNameByIndex(i));
-  }
-
-  private String getFieldNameByIndex(int i) {
-    return getSchema().getFields().get(i).name();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/41724c31/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java 
b/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
index 6adad79..1454ead 100644
--- a/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
@@ -18,22 +18,21 @@
 
 package org.apache.sqoop.mapreduce;
 
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.Map;
+import com.cloudera.sqoop.lib.LargeObjectLoader;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.AvroWrapper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import com.cloudera.sqoop.lib.LargeObjectLoader;
-import com.cloudera.sqoop.lib.SqoopRecord;
-import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
 import org.apache.sqoop.avro.AvroUtil;
 
+import java.io.IOException;
+import java.sql.SQLException;
+
 /**
  * Imports records by transforming them to Avro records in an Avro data file.
  */
@@ -69,7 +68,9 @@ public class AvroImportMapper
       throw new IOException(sqlE);
     }
 
-    wrapper.datum(toGenericRecord(val));
+    GenericRecord outKey = AvroUtil.toGenericRecord(val.getFieldMap(),
+        schema, bigDecimalFormatString);
+    wrapper.datum(outKey);
     context.write(wrapper, NullWritable.get());
   }
 
@@ -80,14 +81,4 @@ public class AvroImportMapper
     }
   }
 
-  private GenericRecord toGenericRecord(SqoopRecord val) {
-    Map<String, Object> fieldMap = val.getFieldMap();
-    GenericRecord record = new GenericData.Record(schema);
-    for (Map.Entry<String, Object> entry : fieldMap.entrySet()) {
-      Object avro = AvroUtil.toAvro(entry.getValue(), bigDecimalFormatString);
-      record.put(entry.getKey(), avro);
-    }
-    return record;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/41724c31/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java 
b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
index 905ba8c..19ec542 100644
--- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
@@ -23,6 +23,8 @@ import java.io.IOException;
 import java.sql.SQLException;
 
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -86,11 +88,7 @@ public class DataDrivenImportJob extends ImportJobBase {
       job.setOutputValueClass(NullWritable.class);
     } else if (options.getFileLayout()
         == SqoopOptions.FileLayout.AvroDataFile) {
-      ConnManager connManager = getContext().getConnManager();
-      AvroSchemaGenerator generator = new AvroSchemaGenerator(options,
-          connManager, tableName);
-      Schema schema = generator.generate();
-
+      Schema schema = generateArvoSchema(tableName);
       try {
         writeAvroSchema(schema);
       } catch (final IOException e) {
@@ -103,8 +101,8 @@ public class DataDrivenImportJob extends ImportJobBase {
       Configuration conf = job.getConfiguration();
       // An Avro schema is required for creating a dataset that manages
       // Parquet data records. The import will fail, if schema is invalid.
-      Schema schema = new Schema.Parser().parse(conf.get("avro.schema"));
-      String uri = "";
+      Schema schema = generateArvoSchema(tableName);
+      String uri;
       if (options.doHiveImport()) {
         uri = "dataset:hive?dataset=" + options.getTableName();
       } else {
@@ -117,6 +115,13 @@ public class DataDrivenImportJob extends ImportJobBase {
     job.setMapperClass(getMapperClass());
   }
 
+  private Schema generateArvoSchema(String tableName) throws IOException {
+    ConnManager connManager = getContext().getConnManager();
+    AvroSchemaGenerator generator = new AvroSchemaGenerator(options,
+        connManager, tableName);
+    return generator.generate();
+  }
+
   private void writeAvroSchema(final Schema schema) throws IOException {
     // Generate schema in JAR output directory.
     final File schemaFile = new File(options.getJarOutputDir(), 
schema.getName() + ".avsc");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/41724c31/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java 
b/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java
index effbadd..fb6af2c 100644
--- a/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java
@@ -19,13 +19,14 @@
 package org.apache.sqoop.mapreduce;
 
 import com.cloudera.sqoop.lib.LargeObjectLoader;
+import com.cloudera.sqoop.lib.SqoopRecord;
 import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.sqoop.lib.SqoopAvroRecord;
+import org.apache.sqoop.avro.AvroUtil;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -34,19 +35,26 @@ import java.sql.SQLException;
  * Imports records by writing them to a Parquet File.
  */
 public class ParquetImportMapper
-    extends AutoProgressMapper<LongWritable, SqoopAvroRecord,
+    extends AutoProgressMapper<LongWritable, SqoopRecord,
         GenericRecord, NullWritable> {
 
+  private Schema schema = null;
+  private boolean bigDecimalFormatString = true;
   private LargeObjectLoader lobLoader = null;
 
   @Override
   protected void setup(Context context)
       throws IOException, InterruptedException {
-    lobLoader = new LargeObjectLoader(context.getConfiguration());
+    Configuration conf = context.getConfiguration();
+    schema = ParquetJob.getAvroSchema(conf);
+    bigDecimalFormatString = conf.getBoolean(
+        ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
+        ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
+    lobLoader = new LargeObjectLoader(conf);
   }
 
   @Override
-  protected void map(LongWritable key, SqoopAvroRecord val, Context context)
+  protected void map(LongWritable key, SqoopRecord val, Context context)
       throws IOException, InterruptedException {
     try {
       // Loading of LOBs was delayed until we have a Context.
@@ -55,7 +63,9 @@ public class ParquetImportMapper
       throw new IOException(sqlE);
     }
 
-    context.write(val, null);
+    GenericRecord outKey = AvroUtil.toGenericRecord(val.getFieldMap(), schema,
+        bigDecimalFormatString);
+    context.write(outKey, null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/41724c31/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java 
b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
index a74432a..6ef29a1 100644
--- a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
@@ -39,6 +39,12 @@ public final class ParquetJob {
   private ParquetJob() {
   }
 
+  private static final String CONF_AVRO_SCHEMA = "avro.schema";
+
+  public static Schema getAvroSchema(Configuration conf) {
+    return new Schema.Parser().parse(conf.get(CONF_AVRO_SCHEMA));
+  }
+
   /**
    * Configure the import job. The import process will use a Kite dataset to
    * write data records into Parquet format internally. The input key class is
@@ -63,6 +69,7 @@ public final class ParquetJob {
     } else {
       dataset = createDataset(schema, uri);
     }
+    conf.set(CONF_AVRO_SCHEMA, schema.toString());
     DatasetKeyOutputFormat.configure(conf).writeTo(dataset);
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/41724c31/src/java/org/apache/sqoop/orm/ClassWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/orm/ClassWriter.java 
b/src/java/org/apache/sqoop/orm/ClassWriter.java
index 4f9dedd..94ff576 100644
--- a/src/java/org/apache/sqoop/orm/ClassWriter.java
+++ b/src/java/org/apache/sqoop/orm/ClassWriter.java
@@ -30,11 +30,9 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
-import org.apache.avro.Schema;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.sqoop.lib.SqoopAvroRecord;
 import org.apache.sqoop.mapreduce.ImportJobBase;
 
 import com.cloudera.sqoop.SqoopOptions;
@@ -1110,26 +1108,6 @@ public class ClassWriter {
     }
   }
 
-  private void generateSqoopAvroRecordMethods(String className, Schema schema, 
StringBuilder sb) {
-    // Define shared immutable attributes as static
-    sb.append("  private final static boolean bigDecimalFormatString;\n");
-    sb.append("  private final static Schema schema;\n");
-    sb.append("  static {\n");
-    sb.append("    bigDecimalFormatString = " + bigDecimalFormatString + 
";\n");
-    sb.append("    schema = new Schema.Parser().parse(\"");
-    sb.append(schema.toString().replaceAll("\"", "\\\\\""));
-    sb.append("\");\n");
-    sb.append("  }\n");
-    sb.append("  @Override\n");
-    sb.append("  public boolean getBigDecimalFormatString() {\n");
-    sb.append("    return bigDecimalFormatString;\n");
-    sb.append("  }\n");
-    sb.append("  @Override\n");
-    sb.append("  public Schema getSchema() {\n");
-    sb.append("    return schema;\n");
-    sb.append("  }\n");
-  }
-
   /**
    * Generate the setField() method.
    * @param columnTypes - mapping from column names to sql types
@@ -1750,15 +1728,9 @@ public class ClassWriter {
       }
     }
 
-    Schema schema = null;
-    if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) {
-      schema = generateAvroSchemaForTable(tableName);
-      options.getConf().set("avro.schema", schema.toString());
-    }
-
     // Generate the Java code.
     StringBuilder sb = generateClassForColumns(columnTypes,
-        cleanedColNames, cleanedDbWriteColNames, schema);
+        cleanedColNames, cleanedDbWriteColNames);
     // Write this out to a file in the jar output directory.
     // We'll move it to the user-visible CodeOutputDir after compiling.
     String codeOutDir = options.getJarOutputDir();
@@ -1816,12 +1788,6 @@ public class ClassWriter {
     }
   }
 
-  private Schema generateAvroSchemaForTable(String tableName) throws 
IOException {
-    AvroSchemaGenerator generator = new AvroSchemaGenerator(options,
-        connManager, tableName);
-    return generator.generate();
-  }
-
   protected String[] getColumnNames(Map<String, Integer> columnTypes) {
     String [] colNames = options.getColumns();
     if (null == colNames) {
@@ -1872,18 +1838,15 @@ public class ClassWriter {
    * @param colNames - ordered list of column names for table.
    * @param dbWriteColNames - ordered list of column names for the db
    * write() method of the class.
-   * @param schema - If a valid Avro schema is specified, the base class will
-   * be SqoopAvroRecord
    * @return - A StringBuilder that contains the text of the class code.
    */
   private StringBuilder generateClassForColumns(
       Map<String, Integer> columnTypes,
-      String [] colNames, String [] dbWriteColNames, Schema schema) {
+      String [] colNames, String [] dbWriteColNames) {
     if (colNames.length ==0) {
       throw new IllegalArgumentException("Attempted to generate class with "
           + "no columns!");
     }
-
     StringBuilder sb = new StringBuilder();
     sb.append("// ORM class for table '" + tableName + "'\n");
     sb.append("// WARNING: This class is AUTO-GENERATED. "
@@ -1915,13 +1878,7 @@ public class ClassWriter {
     sb.append("import " + BlobRef.class.getCanonicalName() + ";\n");
     sb.append("import " + ClobRef.class.getCanonicalName() + ";\n");
     sb.append("import " + LargeObjectLoader.class.getCanonicalName() + ";\n");
-
-    Class baseClass = SqoopRecord.class;
-    if (null != schema) {
-      sb.append("import org.apache.avro.Schema;\n");
-      baseClass = SqoopAvroRecord.class;
-    }
-    sb.append("import " + baseClass.getCanonicalName() + ";\n");
+    sb.append("import " + SqoopRecord.class.getCanonicalName() + ";\n");
     sb.append("import java.sql.PreparedStatement;\n");
     sb.append("import java.sql.ResultSet;\n");
     sb.append("import java.sql.SQLException;\n");
@@ -1941,8 +1898,8 @@ public class ClassWriter {
     sb.append("\n");
 
     String className = tableNameInfo.getShortClassForTable(tableName);
-    sb.append("public class " + className + " extends " + 
baseClass.getSimpleName()
-          + " implements DBWritable, Writable {\n");
+    sb.append("public class " + className + " extends SqoopRecord "
+        + " implements DBWritable, Writable {\n");
     sb.append("  private final int PROTOCOL_VERSION = "
         + CLASS_WRITER_VERSION + ";\n");
     sb.append(
@@ -1961,10 +1918,6 @@ public class ClassWriter {
     generateGetFieldMap(columnTypes, colNames, sb);
     generateSetField(columnTypes, colNames, sb);
 
-    if (baseClass == SqoopAvroRecord.class) {
-      generateSqoopAvroRecordMethods(className, schema, sb);
-    }
-
     // TODO(aaron): Generate hashCode(), compareTo(), equals() so it can be a
     // WritableComparable
 

Reply via email to