Repository: sqoop Updated Branches: refs/heads/trunk 27cdcef4e -> ad13ad081
SQOOP-1391: Compression codec handling (Qian Xu via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ad13ad08 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ad13ad08 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ad13ad08 Branch: refs/heads/trunk Commit: ad13ad08106fe907c76fa3df4c7f5123874952fa Parents: 27cdcef Author: Jarek Jarcec Cecho <[email protected]> Authored: Sun Nov 9 23:25:46 2014 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Sun Nov 9 23:25:46 2014 -0800 ---------------------------------------------------------------------- .../apache/sqoop/mapreduce/ImportJobBase.java | 6 ++-- .../org/apache/sqoop/mapreduce/ParquetJob.java | 31 ++++++++++++++++---- .../com/cloudera/sqoop/TestParquetImport.java | 20 +++++++++++-- 3 files changed, 46 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/ad13ad08/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java index dab5606..04d60fd 100644 --- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java +++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java @@ -145,10 +145,8 @@ public class ImportJobBase extends JobBase { if (codecName != null) { Configuration conf = job.getConfiguration(); String shortName = CodecMap.getCodecShortNameByName(codecName, conf); - if (!shortName.equalsIgnoreCase("default") && - !shortName.equalsIgnoreCase("snappy")) { - // TODO: SQOOP-1391 More compression codec support - LOG.warn("Will use snappy as compression codec instead"); + if (!shortName.equalsIgnoreCase("default")) { + conf.set(ParquetJob.CONF_OUTPUT_CODEC, shortName); } } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ad13ad08/src/java/org/apache/sqoop/mapreduce/ParquetJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java index 6ef29a1..bea74c3 100644 --- a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java +++ b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java @@ -20,7 +20,10 @@ package org.apache.sqoop.mapreduce; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.kitesdk.data.CompressionType; import org.kitesdk.data.Dataset; import org.kitesdk.data.DatasetDescriptor; import org.kitesdk.data.DatasetNotFoundException; @@ -36,19 +39,35 @@ import java.io.IOException; */ public final class ParquetJob { + public static final Log LOG = LogFactory.getLog(ParquetJob.class.getName()); + private ParquetJob() { } - private static final String CONF_AVRO_SCHEMA = "avro.schema"; + private static final String CONF_AVRO_SCHEMA = "parquetjob.avro.schema"; + static final String CONF_OUTPUT_CODEC = "parquetjob.output.codec"; public static Schema getAvroSchema(Configuration conf) { return new Schema.Parser().parse(conf.get(CONF_AVRO_SCHEMA)); } + public static CompressionType getCompressionType(Configuration conf) { + CompressionType defaults = Formats.PARQUET.getDefaultCompressionType(); + String codec = conf.get(CONF_OUTPUT_CODEC, defaults.getName()); + try { + return CompressionType.forName(codec); + } catch (IllegalArgumentException ex) { + LOG.warn(String.format( + "Unsupported compression type '%s'. Fallback to '%s'.", + codec, defaults)); + } + return defaults; + } + /** * Configure the import job. The import process will use a Kite dataset to * write data records into Parquet format internally. The input key class is - * {@link org.apache.sqoop.lib.SqoopAvroRecord}. The output key is + * {@link org.apache.sqoop.lib.SqoopRecord}. The output key is * {@link org.apache.avro.generic.GenericRecord}. */ public static void configureImportJob(Configuration conf, Schema schema, @@ -58,7 +77,7 @@ public final class ParquetJob { try { dataset = Datasets.load(uri); } catch (DatasetNotFoundException ex) { - dataset = createDataset(schema, uri); + dataset = createDataset(schema, getCompressionType(conf), uri); } Schema writtenWith = dataset.getDescriptor().getSchema(); if (!SchemaValidationUtil.canRead(writtenWith, schema)) { @@ -67,16 +86,18 @@ public final class ParquetJob { writtenWith, schema)); } } else { - dataset = createDataset(schema, uri); + dataset = createDataset(schema, getCompressionType(conf), uri); } conf.set(CONF_AVRO_SCHEMA, schema.toString()); DatasetKeyOutputFormat.configure(conf).writeTo(dataset); } - private static Dataset createDataset(Schema schema, String uri) { + private static Dataset createDataset(Schema schema, + CompressionType compressionType, String uri) { DatasetDescriptor descriptor = new DatasetDescriptor.Builder() .schema(schema) .format(Formats.PARQUET) + .compressionType(compressionType) .build(); return Datasets.create(uri, descriptor, GenericRecord.class); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ad13ad08/src/test/com/cloudera/sqoop/TestParquetImport.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/TestParquetImport.java b/src/test/com/cloudera/sqoop/TestParquetImport.java index 2224719..192da17 100644 --- a/src/test/com/cloudera/sqoop/TestParquetImport.java +++ b/src/test/com/cloudera/sqoop/TestParquetImport.java @@ -27,6 +27,7 @@ import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericRecord; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.kitesdk.data.CompressionType; import org.kitesdk.data.Dataset; import org.kitesdk.data.DatasetReader; import org.kitesdk.data.Datasets; @@ -75,13 +76,24 @@ public class TestParquetImport extends ImportJobTestCase { return args.toArray(new String[args.size()]); } - public void testParquetImport() throws IOException { + public void testSnappyCompression() throws IOException { + runParquetImportTest("snappy"); + } + + public void testDeflateCompression() throws IOException { + runParquetImportTest("deflate"); + } + + private void runParquetImportTest(String codec) throws IOException { String[] types = {"BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE", "VARCHAR(6)", "VARBINARY(2)",}; String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", }; createTableWithColTypes(types, vals); - runImport(getOutputArgv(true, null)); + String [] extraArgs = { "--compression-codec", codec}; + runImport(getOutputArgv(true, extraArgs)); + + assertEquals(CompressionType.forName(codec), getCompressionType()); Schema schema = getSchema(); assertEquals(Type.RECORD, schema.getType()); @@ -177,6 +189,10 @@ public class TestParquetImport extends ImportJobTestCase { } } + private CompressionType getCompressionType() { + return getDataset().getDescriptor().getCompressionType(); + } + private Schema getSchema() { return getDataset().getDescriptor().getSchema(); }
