Repository: sqoop Updated Branches: refs/heads/trunk 7e8a6fd89 -> 666700d33
SQOOP-1394: Export data from HDFS back to an RDMS (Qian Xu via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/666700d3 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/666700d3 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/666700d3 Branch: refs/heads/trunk Commit: 666700d33fbe064b74d239188fee5f8f3a96a328 Parents: 7e8a6fd Author: Jarek Jarcec Cecho <[email protected]> Authored: Tue Sep 9 15:30:40 2014 +0200 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Tue Sep 9 15:30:40 2014 +0200 ---------------------------------------------------------------------- .../apache/sqoop/mapreduce/ExportJobBase.java | 5 +- .../apache/sqoop/mapreduce/JdbcExportJob.java | 73 ++-- .../sqoop/mapreduce/ParquetExportMapper.java | 43 ++ .../com/cloudera/sqoop/TestParquetExport.java | 389 +++++++++++++++++++ 4 files changed, 482 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/666700d3/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java index 54c27ee..100c73c 100644 --- a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java +++ b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java @@ -58,7 +58,7 @@ public class ExportJobBase extends JobBase { * The (inferred) type of a file or group of files. */ public enum FileType { - SEQUENCE_FILE, AVRO_DATA_FILE, HCATALOG_MANAGED_FILE, UNKNOWN + SEQUENCE_FILE, AVRO_DATA_FILE, HCATALOG_MANAGED_FILE, PARQUET_FILE, UNKNOWN } public static final Log LOG = LogFactory.getLog( @@ -190,6 +190,9 @@ public class ExportJobBase extends JobBase { if (header[0] == 'O' && header[1] == 'b' && header[2] == 'j') { return FileType.AVRO_DATA_FILE; } + if (header[0] == 'P' && header[1] == 'A' && header[2] == 'R') { + return FileType.PARQUET_FILE; + } return FileType.UNKNOWN; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/666700d3/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java index fee78e0..93d438a 100644 --- a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java @@ -18,10 +18,14 @@ package org.apache.sqoop.mapreduce; -import java.io.IOException; -import java.util.Map; +import com.cloudera.sqoop.manager.ConnManager; +import com.cloudera.sqoop.manager.ExportJobContext; +import com.cloudera.sqoop.mapreduce.ExportJobBase; +import com.cloudera.sqoop.mapreduce.db.DBConfiguration; +import com.cloudera.sqoop.mapreduce.db.DBOutputFormat; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.DefaultStringifier; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; @@ -30,11 +34,10 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities; -import com.cloudera.sqoop.manager.ConnManager; -import com.cloudera.sqoop.manager.ExportJobContext; -import com.cloudera.sqoop.mapreduce.ExportJobBase; -import com.cloudera.sqoop.mapreduce.db.DBConfiguration; -import com.cloudera.sqoop.mapreduce.db.DBOutputFormat; +import org.kitesdk.data.mapreduce.DatasetKeyInputFormat; + +import java.io.IOException; +import java.util.Map; /** * Run an export using JDBC (JDBC-based ExportOutputFormat). @@ -72,27 +75,37 @@ public class JdbcExportJob extends ExportJobBase { return; } else if (fileType == FileType.AVRO_DATA_FILE) { LOG.debug("Configuring for Avro export"); - ConnManager connManager = context.getConnManager(); - Map<String, Integer> columnTypeInts; - if (options.getCall() == null) { - columnTypeInts = connManager.getColumnTypes( + configureGenericRecordExportInputFormat(job, tableName); + } else if (fileType == FileType.PARQUET_FILE) { + LOG.debug("Configuring for Parquet export"); + configureGenericRecordExportInputFormat(job, tableName); + FileSystem fs = FileSystem.get(job.getConfiguration()); + String uri = "dataset:" + fs.makeQualified(getInputPath()); + DatasetKeyInputFormat.configure(job).readFrom(uri); + } + } + + private void configureGenericRecordExportInputFormat(Job job, String tableName) + throws IOException { + ConnManager connManager = context.getConnManager(); + Map<String, Integer> columnTypeInts; + if (options.getCall() == null) { + columnTypeInts = connManager.getColumnTypes( tableName, options.getSqlQuery()); - } else { - columnTypeInts = connManager.getColumnTypesForProcedure( + } else { + columnTypeInts = connManager.getColumnTypesForProcedure( options.getCall()); - } - MapWritable columnTypes = new MapWritable(); - for (Map.Entry<String, Integer> e : columnTypeInts.entrySet()) { - Text columnName = new Text(e.getKey()); - Text columnText = new Text( - connManager.toJavaType(tableName, e.getKey(), e.getValue())); - columnTypes.put(columnName, columnText); - } - DefaultStringifier.store(job.getConfiguration(), columnTypes, - AvroExportMapper.AVRO_COLUMN_TYPES_MAP); } - + MapWritable columnTypes = new MapWritable(); + for (Map.Entry<String, Integer> e : columnTypeInts.entrySet()) { + Text columnName = new Text(e.getKey()); + Text columnText = new Text( + connManager.toJavaType(tableName, e.getKey(), e.getValue())); + columnTypes.put(columnName, columnText); + } + DefaultStringifier.store(job.getConfiguration(), columnTypes, + AvroExportMapper.AVRO_COLUMN_TYPES_MAP); } @Override @@ -101,10 +114,14 @@ public class JdbcExportJob extends ExportJobBase { if (isHCatJob) { return SqoopHCatUtilities.getInputFormatClass(); } - if (fileType == FileType.AVRO_DATA_FILE) { - return AvroInputFormat.class; + switch (fileType) { + case AVRO_DATA_FILE: + return AvroInputFormat.class; + case PARQUET_FILE: + return DatasetKeyInputFormat.class; + default: + return super.getInputFormatClass(); } - return super.getInputFormatClass(); } @Override @@ -117,6 +134,8 @@ public class JdbcExportJob extends ExportJobBase { return SequenceFileExportMapper.class; case AVRO_DATA_FILE: return AvroExportMapper.class; + case PARQUET_FILE: + return ParquetExportMapper.class; case UNKNOWN: default: return TextExportMapper.class; http://git-wip-us.apache.org/repos/asf/sqoop/blob/666700d3/src/java/org/apache/sqoop/mapreduce/ParquetExportMapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetExportMapper.java b/src/java/org/apache/sqoop/mapreduce/ParquetExportMapper.java new file mode 100644 index 0000000..2bc0cba --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/ParquetExportMapper.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.io.NullWritable; + +import java.io.IOException; + +/** + * Exports Parquet records from a data source. + */ +public class ParquetExportMapper + extends GenericRecordExportMapper<GenericRecord, NullWritable> { + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + } + + @Override + protected void map(GenericRecord key, NullWritable val, + Context context) throws IOException, InterruptedException { + context.write(toSqoopRecord(key), NullWritable.get()); + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/666700d3/src/test/com/cloudera/sqoop/TestParquetExport.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/TestParquetExport.java b/src/test/com/cloudera/sqoop/TestParquetExport.java new file mode 100644 index 0000000..9065daf --- /dev/null +++ b/src/test/com/cloudera/sqoop/TestParquetExport.java @@ -0,0 +1,389 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop; + +import com.cloudera.sqoop.testutil.ExportJobTestCase; +import com.google.common.collect.Lists; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.kitesdk.data.*; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; + +/** + * Test that we can export Parquet Data Files from HDFS into databases. + */ +public class TestParquetExport extends ExportJobTestCase { + + /** + * @return an argv for the CodeGenTool to use when creating tables to export. + */ + protected String [] getCodeGenArgv(String... extraArgs) { + List<String> codeGenArgv = new ArrayList<String>(); + + if (null != extraArgs) { + for (String arg : extraArgs) { + codeGenArgv.add(arg); + } + } + + codeGenArgv.add("--table"); + codeGenArgv.add(getTableName()); + codeGenArgv.add("--connect"); + codeGenArgv.add(getConnectString()); + + return codeGenArgv.toArray(new String[0]); + } + + /** When generating data for export tests, each column is generated + according to a ColumnGenerator. Methods exist for determining + what to put into Parquet objects in the files to export, as well + as what the object representation of the column as returned by + the database should look like. + */ + public interface ColumnGenerator { + /** For a row with id rowNum, what should we write into that + Parquet record to export? + */ + Object getExportValue(int rowNum); + + /** Return the Parquet schema for the field. */ + Schema getColumnParquetSchema(); + + /** For a row with id rowNum, what should the database return + for the given column's value? + */ + Object getVerifyValue(int rowNum); + + /** Return the column type to put in the CREATE TABLE statement. */ + String getColumnType(); + } + + private ColumnGenerator colGenerator(final Object exportValue, + final Schema schema, final Object verifyValue, + final String columnType) { + return new ColumnGenerator() { + @Override + public Object getVerifyValue(int rowNum) { + return verifyValue; + } + @Override + public Object getExportValue(int rowNum) { + return exportValue; + } + @Override + public String getColumnType() { + return columnType; + } + @Override + public Schema getColumnParquetSchema() { + return schema; + } + }; + } + + /** + * Create a data file that gets exported to the db. + * @param fileNum the number of the file (for multi-file export) + * @param numRecords how many records to write to the file. + */ + protected void createParquetFile(int fileNum, int numRecords, + ColumnGenerator... extraCols) throws IOException { + + String uri = "dataset:file:" + getTablePath(); + Schema schema = buildSchema(extraCols); + DatasetDescriptor descriptor = new DatasetDescriptor.Builder() + .schema(schema) + .format(Formats.PARQUET) + .build(); + Dataset dataset = Datasets.create(uri, descriptor); + DatasetWriter writer = dataset.newWriter(); + try { + for (int i = 0; i < numRecords; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put("id", i); + record.put("msg", getMsgPrefix() + i); + addExtraColumns(record, i, extraCols); + writer.write(record); + } + } finally { + writer.close(); + } + } + + private Schema buildSchema(ColumnGenerator... extraCols) { + List<Field> fields = new ArrayList<Field>(); + fields.add(buildField("id", Schema.Type.INT)); + fields.add(buildField("msg", Schema.Type.STRING)); + int colNum = 0; + for (ColumnGenerator gen : extraCols) { + if (gen.getColumnParquetSchema() != null) { + fields.add(buildParquetField(forIdx(colNum++), + gen.getColumnParquetSchema())); + } + } + Schema schema = Schema.createRecord("myschema", null, null, false); + schema.setFields(fields); + return schema; + } + + private void addExtraColumns(GenericRecord record, int rowNum, + ColumnGenerator[] extraCols) { + int colNum = 0; + for (ColumnGenerator gen : extraCols) { + if (gen.getColumnParquetSchema() != null) { + record.put(forIdx(colNum++), gen.getExportValue(rowNum)); + } + } + } + + private Field buildField(String name, Schema.Type type) { + return new Field(name, Schema.create(type), null, null); + } + + private Field buildParquetField(String name, Schema schema) { + return new Field(name, schema, null, null); + } + + /** Return the column name for a column index. + * Each table contains two columns named 'id' and 'msg', and then an + * arbitrary number of additional columns defined by ColumnGenerators. + * These columns are referenced by idx 0, 1, 2... + * @param idx the index of the ColumnGenerator in the array passed to + * createTable(). + * @return the name of the column + */ + protected String forIdx(int idx) { + return "col" + idx; + } + + /** + * Return a SQL statement that drops a table, if it exists. + * @param tableName the table to drop. + * @return the SQL statement to drop that table. + */ + protected String getDropTableStatement(String tableName) { + return "DROP TABLE " + tableName + " IF EXISTS"; + } + + /** Create the table definition to export to, removing any prior table. + By specifying ColumnGenerator arguments, you can add extra columns + to the table of arbitrary type. + */ + private void createTable(ColumnGenerator... extraColumns) + throws SQLException { + Connection conn = getConnection(); + PreparedStatement statement = conn.prepareStatement( + getDropTableStatement(getTableName()), + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try { + statement.executeUpdate(); + conn.commit(); + } finally { + statement.close(); + } + + StringBuilder sb = new StringBuilder(); + sb.append("CREATE TABLE "); + sb.append(getTableName()); + sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)"); + int colNum = 0; + for (ColumnGenerator gen : extraColumns) { + if (gen.getColumnType() != null) { + sb.append(", " + forIdx(colNum++) + " " + gen.getColumnType()); + } + } + sb.append(")"); + + statement = conn.prepareStatement(sb.toString(), + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try { + statement.executeUpdate(); + conn.commit(); + } finally { + statement.close(); + } + } + + /** Verify that on a given row, a column has a given value. + * @param id the id column specifying the row to test. + */ + private void assertColValForRowId(int id, String colName, Object expectedVal) + throws SQLException { + Connection conn = getConnection(); + LOG.info("Verifying column " + colName + " has value " + expectedVal); + + PreparedStatement statement = conn.prepareStatement( + "SELECT " + colName + " FROM " + getTableName() + " WHERE id = " + id, + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + Object actualVal = null; + try { + ResultSet rs = statement.executeQuery(); + try { + rs.next(); + actualVal = rs.getObject(1); + } finally { + rs.close(); + } + } finally { + statement.close(); + } + + if (expectedVal != null && expectedVal instanceof byte[]) { + assertArrayEquals((byte[]) expectedVal, (byte[]) actualVal); + } else { + assertEquals("Got unexpected column value", expectedVal, actualVal); + } + } + + /** Verify that for the max and min values of the 'id' column, the values + for a given column meet the expected values. + */ + protected void assertColMinAndMax(String colName, ColumnGenerator generator) + throws SQLException { + Connection conn = getConnection(); + int minId = getMinRowId(conn); + int maxId = getMaxRowId(conn); + + LOG.info("Checking min/max for column " + colName + " with type " + + generator.getColumnType()); + + Object expectedMin = generator.getVerifyValue(minId); + Object expectedMax = generator.getVerifyValue(maxId); + + assertColValForRowId(minId, colName, expectedMin); + assertColValForRowId(maxId, colName, expectedMax); + } + + public void testSupportedParquetTypes() throws IOException, SQLException { + String[] argv = {}; + final int TOTAL_RECORDS = 1 * 10; + + byte[] b = new byte[] { (byte) 1, (byte) 2 }; + Schema fixed = Schema.createFixed("myfixed", null, null, 2); + Schema enumeration = Schema.createEnum("myenum", null, null, + Lists.newArrayList("a", "b")); + + ColumnGenerator[] gens = new ColumnGenerator[] { + colGenerator(true, Schema.create(Schema.Type.BOOLEAN), true, "BIT"), + colGenerator(100, Schema.create(Schema.Type.INT), 100, "INTEGER"), + colGenerator(200L, Schema.create(Schema.Type.LONG), 200L, "BIGINT"), + // HSQLDB maps REAL to double, not float: + colGenerator(1.0f, Schema.create(Schema.Type.FLOAT), 1.0d, "REAL"), + colGenerator(2.0d, Schema.create(Schema.Type.DOUBLE), 2.0d, "DOUBLE"), + colGenerator("s", Schema.create(Schema.Type.STRING), "s", "VARCHAR(8)"), + colGenerator(ByteBuffer.wrap(b), Schema.create(Schema.Type.BYTES), + b, "VARBINARY(8)"), + colGenerator(new GenericData.Fixed(fixed, b), fixed, + b, "BINARY(2)"), + colGenerator(new GenericData.EnumSymbol(enumeration, "a"), enumeration, + "a", "VARCHAR(8)"), + }; + createParquetFile(0, TOTAL_RECORDS, gens); + createTable(gens); + runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); + verifyExport(TOTAL_RECORDS); + for (int i = 0; i < gens.length; i++) { + assertColMinAndMax(forIdx(i), gens[i]); + } + } + + public void testNullableField() throws IOException, SQLException { + String[] argv = {}; + final int TOTAL_RECORDS = 1 * 10; + + List<Schema> childSchemas = new ArrayList<Schema>(); + childSchemas.add(Schema.create(Schema.Type.STRING)); + childSchemas.add(Schema.create(Schema.Type.NULL)); + Schema schema = Schema.createUnion(childSchemas); + ColumnGenerator gen0 = colGenerator(null, schema, null, "VARCHAR(64)"); + ColumnGenerator gen1 = colGenerator("s", schema, "s", "VARCHAR(64)"); + createParquetFile(0, TOTAL_RECORDS, gen0, gen1); + createTable(gen0, gen1); + runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); + verifyExport(TOTAL_RECORDS); + assertColMinAndMax(forIdx(0), gen0); + assertColMinAndMax(forIdx(1), gen1); + } + + public void testParquetRecordsNotSupported() throws IOException, SQLException { + String[] argv = {}; + final int TOTAL_RECORDS = 1; + + Schema schema = Schema.createRecord("nestedrecord", null, null, false); + schema.setFields(Lists.newArrayList(buildField("myint", + Schema.Type.INT))); + GenericRecord record = new GenericData.Record(schema); + record.put("myint", 100); + // DB type is not used so can be anything: + ColumnGenerator gen = colGenerator(record, schema, null, "VARCHAR(64)"); + createParquetFile(0, TOTAL_RECORDS, gen); + createTable(gen); + try { + runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); + fail("Parquet records can not be exported."); + } catch (Exception e) { + // expected + assertTrue(true); + } + } + + public void testMissingDatabaseFields() throws IOException, SQLException { + String[] argv = {}; + final int TOTAL_RECORDS = 1; + + // null column type means don't create a database column + // the Parquet value will not be exported + ColumnGenerator gen = colGenerator(100, Schema.create(Schema.Type.INT), + null, null); + createParquetFile(0, TOTAL_RECORDS, gen); + createTable(gen); + runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); + verifyExport(TOTAL_RECORDS); + } + + public void testMissingParquetFields() throws IOException, SQLException { + String[] argv = {}; + final int TOTAL_RECORDS = 1; + + // null Parquet schema means don't create an Parquet field + ColumnGenerator gen = colGenerator(null, null, null, "VARCHAR(64)"); + createParquetFile(0, TOTAL_RECORDS, gen); + createTable(gen); + try { + runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); + fail("Missing Parquet field."); + } catch (Exception e) { + // expected + assertTrue(true); + } + } + +}
