Updated Branches: refs/heads/sqoop2 83b74f5ad -> 581f9b798
SQOOP-1196: Sqoop2: Add support for arbitrary compression codecs (Raghav Kumar Gautam via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/581f9b79 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/581f9b79 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/581f9b79 Branch: refs/heads/sqoop2 Commit: 581f9b798d9125e05979d4794c6c07b384f80dca Parents: 83b74f5 Author: Jarek Jarcec Cecho <[email protected]> Authored: Sat Sep 21 16:11:31 2013 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Sat Sep 21 16:11:31 2013 -0700 ---------------------------------------------------------------------- .../sqoop/framework/FrameworkValidator.java | 14 ++++++ .../configuration/OutputCompression.java | 1 + .../framework/configuration/OutputForm.java | 2 + .../resources/framework-resources.properties | 3 ++ .../sqoop/framework/TestFrameworkValidator.java | 49 ++++++++++++++++++++ .../mapreduce/MapreduceExecutionEngine.java | 2 + .../mapreduce/MapreduceExecutionEngineTest.java | 26 +++++++++++ 7 files changed, 97 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/581f9b79/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java index d72b9f8..f5f6a36 100644 --- a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java +++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java @@ -21,6 +21,7 @@ import org.apache.sqoop.framework.configuration.ConnectionConfiguration; import org.apache.sqoop.framework.configuration.ExportJobConfiguration; import org.apache.sqoop.framework.configuration.ImportJobConfiguration; import org.apache.sqoop.framework.configuration.InputForm; +import org.apache.sqoop.framework.configuration.OutputCompression; import org.apache.sqoop.framework.configuration.OutputForm; import org.apache.sqoop.framework.configuration.ThrottlingForm; import org.apache.sqoop.model.MJob; @@ -83,6 +84,19 @@ public class FrameworkValidator extends Validator { if(output.outputDirectory == null || output.outputDirectory.isEmpty()) { validation.addMessage(Status.UNACCEPTABLE, "output", "outputDirectory", "Output directory is empty"); } + if(output.customCompression != null && + output.customCompression.trim().length() > 0 && + output.compression != OutputCompression.CUSTOM) { + validation.addMessage(Status.UNACCEPTABLE, "output", "compression", + "custom compression should be blank as " + output.compression + " is being used."); + } + if(output.compression == OutputCompression.CUSTOM && + (output.customCompression == null || + output.customCompression.trim().length() == 0) + ) { + validation.addMessage(Status.UNACCEPTABLE, "output", "compression", + "custom compression is blank."); + } } private void validateThrottingForm(Validation validation, ThrottlingForm throttling) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/581f9b79/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java index 3b5ffc5..6cac46d 100644 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java +++ b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java @@ -29,4 +29,5 @@ public enum OutputCompression { LZO, LZ4, SNAPPY, + CUSTOM, } http://git-wip-us.apache.org/repos/asf/sqoop/blob/581f9b79/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java index 18eeab3..b2cdb44 100644 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java +++ b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java @@ -32,5 +32,7 @@ public class OutputForm { @Input public OutputCompression compression; + @Input(size = 255) public String customCompression; + @Input(size = 255) public String outputDirectory; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/581f9b79/core/src/main/resources/framework-resources.properties ---------------------------------------------------------------------- diff --git a/core/src/main/resources/framework-resources.properties b/core/src/main/resources/framework-resources.properties index a3d3330..7ecb9ae 100644 --- a/core/src/main/resources/framework-resources.properties +++ b/core/src/main/resources/framework-resources.properties @@ -41,6 +41,9 @@ output.outputFormat.help = Format in which data should be serialized output.compression.label = Compression format output.compression.help = Compression that should be used for the data +output.customCompression.label = Custom compression format +output.customCompression.help = Full class name of the custom compression + output.outputDirectory.label = Output directory output.outputDirectory.help = Output directory for final data http://git-wip-us.apache.org/repos/asf/sqoop/blob/581f9b79/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java b/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java index 9e1997a..7e25d34 100644 --- a/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java +++ b/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java @@ -20,6 +20,7 @@ package org.apache.sqoop.framework; import org.apache.sqoop.framework.configuration.ConnectionConfiguration; import org.apache.sqoop.framework.configuration.ExportJobConfiguration; import org.apache.sqoop.framework.configuration.ImportJobConfiguration; +import org.apache.sqoop.framework.configuration.OutputCompression; import org.apache.sqoop.model.MJob; import org.apache.sqoop.validation.Status; import org.apache.sqoop.validation.Validation; @@ -115,5 +116,53 @@ public class TestFrameworkValidator { assertEquals(Status.UNACCEPTABLE, validation.getStatus()); assertTrue(validation.getMessages().containsKey(new Validation.FormInput("throttling.extractors"))); assertTrue(validation.getMessages().containsKey(new Validation.FormInput("throttling.loaders"))); + + // specifying both compression as well as customCompression is + // unacceptable + configuration = new ImportJobConfiguration(); + configuration.output.outputDirectory = "/czech/republic"; + configuration.throttling.extractors = 2; + configuration.throttling.loaders = 2; + configuration.output.compression = OutputCompression.BZIP2; + configuration.output.customCompression = "some.compression.codec"; + + validation = validator.validateJob(MJob.Type.IMPORT, configuration); + assertEquals(Status.UNACCEPTABLE, validation.getStatus()); + assertTrue(validation.getMessages().containsKey(new Validation.FormInput("output.compression"))); + + // specifying a customCompression is fine + configuration = new ImportJobConfiguration(); + configuration.output.outputDirectory = "/czech/republic"; + configuration.throttling.extractors = 2; + configuration.throttling.loaders = 2; + configuration.output.compression = OutputCompression.CUSTOM; + configuration.output.customCompression = "some.compression.codec"; + + validation = validator.validateJob(MJob.Type.IMPORT, configuration); + assertEquals(Status.FINE, validation.getStatus()); + + // specifying a customCompression without codec name is unacceptable + configuration = new ImportJobConfiguration(); + configuration.output.outputDirectory = "/czech/republic"; + configuration.throttling.extractors = 2; + configuration.throttling.loaders = 2; + configuration.output.compression = OutputCompression.CUSTOM; + configuration.output.customCompression = ""; + + validation = validator.validateJob(MJob.Type.IMPORT, configuration); + assertEquals(Status.UNACCEPTABLE, validation.getStatus()); + assertTrue(validation.getMessages().containsKey(new Validation.FormInput("output.compression"))); + + configuration = new ImportJobConfiguration(); + configuration.output.outputDirectory = "/czech/republic"; + configuration.throttling.extractors = 2; + configuration.throttling.loaders = 2; + configuration.output.compression = OutputCompression.CUSTOM; + configuration.output.customCompression = null; + + validation = validator.validateJob(MJob.Type.IMPORT, configuration); + assertEquals(Status.UNACCEPTABLE, validation.getStatus()); + assertTrue(validation.getMessages().containsKey(new Validation.FormInput("output.compression"))); + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/581f9b79/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java index 392007d..5c0a027 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java @@ -123,6 +123,8 @@ public class MapreduceExecutionEngine extends ExecutionEngine { return "org.apache.hadoop.io.compress.Lz4Codec"; case SNAPPY: return "org.apache.hadoop.io.compress.SnappyCodec"; + case CUSTOM: + return jobConf.output.customCompression.trim(); } return null; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/581f9b79/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java index 19f5a22..39d1b53 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java @@ -83,4 +83,30 @@ public class MapreduceExecutionEngineTest { assertEquals("Unexpected codec flag was returned", obtainedCodecFlag, expectedCompressionFlag); } + + @Test + public void testCustomCompression() { + MapreduceExecutionEngine executionEngine = new MapreduceExecutionEngine(); + final String customCodecName = "custom.compression"; + SubmissionRequest request = executionEngine.createSubmissionRequest(); + ImportJobConfiguration jobConf = new ImportJobConfiguration(); + jobConf.output.outputFormat = OutputFormat.TEXT_FILE; + jobConf.output.compression = OutputCompression.CUSTOM; + jobConf.output.customCompression = customCodecName; + request.setConfigFrameworkJob(jobConf); + request.setConnectorCallbacks(new Importer(Initializer.class, + Partitioner.class, Extractor.class, Destroyer.class) { + }); + executionEngine.prepareImportSubmission(request); + + MutableMapContext context = request.getFrameworkContext(); + final String obtainedCodecName = context.getString( + JobConstants.HADOOP_COMPRESS_CODEC); + final boolean obtainedCodecFlag = + context.getBoolean(JobConstants.HADOOP_COMPRESS, false); + assertEquals("Unexpected codec name was returned", obtainedCodecName, + customCodecName); + assertEquals("Unexpected codec flag was returned", obtainedCodecFlag, true); + } + }
