Updated Branches: refs/heads/sqoop2 894d21ded -> 08a829fd6
SQOOP-1182: Expose compression options for Sqoop2 import (Raghav Kumar Gautam via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/08a829fd Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/08a829fd Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/08a829fd Branch: refs/heads/sqoop2 Commit: 08a829fd6ca8476280066e973311fb346664d62b Parents: 894d21d Author: Jarek Jarcec Cecho <[email protected]> Authored: Mon Sep 9 01:33:16 2013 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Mon Sep 9 01:33:16 2013 -0700 ---------------------------------------------------------------------- .../configuration/OutputCompression.java | 32 ++++++++ .../framework/configuration/OutputForm.java | 2 + .../resources/framework-resources.properties | 3 + .../mapreduce/MapreduceExecutionEngine.java | 29 +++++++ .../mapreduce/MapreduceExecutionEngineTest.java | 86 ++++++++++++++++++++ 5 files changed, 152 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/08a829fd/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java new file mode 100644 index 0000000..3b5ffc5 --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.framework.configuration; + +/** + * Supported compressions + */ +public enum OutputCompression { + NONE, + DEFAULT, + DEFLATE, + GZIP, + BZIP2, + LZO, + LZ4, + SNAPPY, +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/08a829fd/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java index 3cb9499..18eeab3 100644 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java +++ b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java @@ -30,5 +30,7 @@ public class OutputForm { @Input public OutputFormat outputFormat; + @Input public OutputCompression compression; + @Input(size = 255) public String outputDirectory; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/08a829fd/core/src/main/resources/framework-resources.properties ---------------------------------------------------------------------- diff --git a/core/src/main/resources/framework-resources.properties b/core/src/main/resources/framework-resources.properties index cebc90e..a3d3330 100644 --- a/core/src/main/resources/framework-resources.properties +++ b/core/src/main/resources/framework-resources.properties @@ -38,6 +38,9 @@ output.storageType.help = Target on Hadoop ecosystem where to store data output.outputFormat.label = Output format output.outputFormat.help = Format in which data should be serialized +output.compression.label = Compression format +output.compression.help = Compression that should be used for the data + output.outputDirectory.label = Output directory output.outputDirectory.help = Output directory for final data http://git-wip-us.apache.org/repos/asf/sqoop/blob/08a829fd/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java index 767080c..392007d 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java @@ -96,6 +96,35 @@ public class MapreduceExecutionEngine extends ExecutionEngine { throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0024, "Format: " + jobConf.output.outputFormat); } + if(getCompressionCodecName(jobConf) != null) { + context.setString(JobConstants.HADOOP_COMPRESS_CODEC, + getCompressionCodecName(jobConf)); + context.setBoolean(JobConstants.HADOOP_COMPRESS, true); + } + } + + private String getCompressionCodecName(ImportJobConfiguration jobConf) { + if(jobConf.output.compression == null) + return null; + switch(jobConf.output.compression) { + case NONE: + return null; + case DEFAULT: + return "org.apache.hadoop.io.compress.DefaultCodec"; + case DEFLATE: + return "org.apache.hadoop.io.compress.DeflateCodec"; + case GZIP: + return "org.apache.hadoop.io.compress.GzipCodec"; + case BZIP2: + return "org.apache.hadoop.io.compress.BZip2Codec"; + case LZO: + return "com.hadoop.compression.lzo.LzoCodec"; + case LZ4: + return "org.apache.hadoop.io.compress.Lz4Codec"; + case SNAPPY: + return "org.apache.hadoop.io.compress.SnappyCodec"; + } + return null; } /** http://git-wip-us.apache.org/repos/asf/sqoop/blob/08a829fd/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java new file mode 100644 index 0000000..19f5a22 --- /dev/null +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.execution.mapreduce; + +import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.framework.SubmissionRequest; +import org.apache.sqoop.framework.configuration.ImportJobConfiguration; +import org.apache.sqoop.framework.configuration.OutputCompression; +import org.apache.sqoop.framework.configuration.OutputFormat; +import org.apache.sqoop.job.JobConstants; +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.job.etl.Extractor; +import org.apache.sqoop.job.etl.Importer; +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.Partitioner; +import org.junit.Test; + +import static junit.framework.TestCase.assertEquals; + +public class MapreduceExecutionEngineTest { + @Test + public void testImportCompression() throws Exception { + testImportCompressionInner(OutputCompression.NONE, + null, false); + + testImportCompressionInner(OutputCompression.DEFAULT, + "org.apache.hadoop.io.compress.DefaultCodec", true); + + testImportCompressionInner(OutputCompression.GZIP, + "org.apache.hadoop.io.compress.GzipCodec", true); + + testImportCompressionInner(OutputCompression.BZIP2, + "org.apache.hadoop.io.compress.BZip2Codec", true); + + testImportCompressionInner(OutputCompression.LZO, + "com.hadoop.compression.lzo.LzoCodec", true); + + testImportCompressionInner(OutputCompression.LZ4, + "org.apache.hadoop.io.compress.Lz4Codec", true); + + testImportCompressionInner(OutputCompression.SNAPPY, + "org.apache.hadoop.io.compress.SnappyCodec", true); + + testImportCompressionInner(null, + null, false); + } + + private void testImportCompressionInner(OutputCompression comprssionFormat, + String expectedCodecName, boolean expectedCompressionFlag) { + MapreduceExecutionEngine executionEngine = new MapreduceExecutionEngine(); + SubmissionRequest request = executionEngine.createSubmissionRequest(); + ImportJobConfiguration jobConf = new ImportJobConfiguration(); + jobConf.output.outputFormat = OutputFormat.TEXT_FILE; + jobConf.output.compression = comprssionFormat; + request.setConfigFrameworkJob(jobConf); + request.setConnectorCallbacks(new Importer(Initializer.class, + Partitioner.class, Extractor.class, Destroyer.class) { + }); + executionEngine.prepareImportSubmission(request); + + MutableMapContext context = request.getFrameworkContext(); + final String obtainedCodecName = context.getString( + JobConstants.HADOOP_COMPRESS_CODEC); + final boolean obtainedCodecFlag = + context.getBoolean(JobConstants.HADOOP_COMPRESS, false); + assertEquals("Unexpected codec name was returned", obtainedCodecName, + expectedCodecName); + assertEquals("Unexpected codec flag was returned", obtainedCodecFlag, + expectedCompressionFlag); + } +}
