Updated Branches:
  refs/heads/sqoop2 83b74f5ad -> 581f9b798

SQOOP-1196: Sqoop2: Add support for arbitrary compression codecs

(Raghav Kumar Gautam via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/581f9b79
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/581f9b79
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/581f9b79

Branch: refs/heads/sqoop2
Commit: 581f9b798d9125e05979d4794c6c07b384f80dca
Parents: 83b74f5
Author: Jarek Jarcec Cecho <[email protected]>
Authored: Sat Sep 21 16:11:31 2013 -0700
Committer: Jarek Jarcec Cecho <[email protected]>
Committed: Sat Sep 21 16:11:31 2013 -0700

----------------------------------------------------------------------
 .../sqoop/framework/FrameworkValidator.java     | 14 ++++++
 .../configuration/OutputCompression.java        |  1 +
 .../framework/configuration/OutputForm.java     |  2 +
 .../resources/framework-resources.properties    |  3 ++
 .../sqoop/framework/TestFrameworkValidator.java | 49 ++++++++++++++++++++
 .../mapreduce/MapreduceExecutionEngine.java     |  2 +
 .../mapreduce/MapreduceExecutionEngineTest.java | 26 +++++++++++
 7 files changed, 97 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/581f9b79/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java 
b/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
index d72b9f8..f5f6a36 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
@@ -21,6 +21,7 @@ import 
org.apache.sqoop.framework.configuration.ConnectionConfiguration;
 import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
 import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
 import org.apache.sqoop.framework.configuration.InputForm;
+import org.apache.sqoop.framework.configuration.OutputCompression;
 import org.apache.sqoop.framework.configuration.OutputForm;
 import org.apache.sqoop.framework.configuration.ThrottlingForm;
 import org.apache.sqoop.model.MJob;
@@ -83,6 +84,19 @@ public class FrameworkValidator extends Validator {
     if(output.outputDirectory == null || output.outputDirectory.isEmpty()) {
       validation.addMessage(Status.UNACCEPTABLE, "output", "outputDirectory", 
"Output directory is empty");
     }
+    if(output.customCompression != null &&
+      output.customCompression.trim().length() > 0  &&
+      output.compression != OutputCompression.CUSTOM) {
+      validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
+        "custom compression should be blank as " + output.compression + " is 
being used.");
+    }
+    if(output.compression == OutputCompression.CUSTOM &&
+      (output.customCompression == null ||
+        output.customCompression.trim().length() == 0)
+      ) {
+      validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
+        "custom compression is blank.");
+    }
   }
 
   private void validateThrottingForm(Validation validation, ThrottlingForm 
throttling) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/581f9b79/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java
 
b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java
index 3b5ffc5..6cac46d 100644
--- 
a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java
+++ 
b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java
@@ -29,4 +29,5 @@ public enum OutputCompression {
   LZO,
   LZ4,
   SNAPPY,
+  CUSTOM,
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/581f9b79/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java 
b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java
index 18eeab3..b2cdb44 100644
--- 
a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java
+++ 
b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java
@@ -32,5 +32,7 @@ public class OutputForm {
 
   @Input public OutputCompression compression;
 
+  @Input(size = 255) public String customCompression;
+
   @Input(size = 255) public String outputDirectory;
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/581f9b79/core/src/main/resources/framework-resources.properties
----------------------------------------------------------------------
diff --git a/core/src/main/resources/framework-resources.properties 
b/core/src/main/resources/framework-resources.properties
index a3d3330..7ecb9ae 100644
--- a/core/src/main/resources/framework-resources.properties
+++ b/core/src/main/resources/framework-resources.properties
@@ -41,6 +41,9 @@ output.outputFormat.help = Format in which data should be 
serialized
 output.compression.label = Compression format
 output.compression.help = Compression that should be used for the data
 
+output.customCompression.label = Custom compression format
+output.customCompression.help = Full class name of the custom compression
+
 output.outputDirectory.label = Output directory
 output.outputDirectory.help = Output directory for final data
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/581f9b79/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java 
b/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java
index 9e1997a..7e25d34 100644
--- a/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java
+++ b/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.framework;
 import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
 import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
 import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
+import org.apache.sqoop.framework.configuration.OutputCompression;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.validation.Status;
 import org.apache.sqoop.validation.Validation;
@@ -115,5 +116,53 @@ public class TestFrameworkValidator {
     assertEquals(Status.UNACCEPTABLE, validation.getStatus());
     assertTrue(validation.getMessages().containsKey(new 
Validation.FormInput("throttling.extractors")));
     assertTrue(validation.getMessages().containsKey(new 
Validation.FormInput("throttling.loaders")));
+
+    // specifying both compression as well as customCompression is
+    // unacceptable
+    configuration = new ImportJobConfiguration();
+    configuration.output.outputDirectory = "/czech/republic";
+    configuration.throttling.extractors = 2;
+    configuration.throttling.loaders = 2;
+    configuration.output.compression = OutputCompression.BZIP2;
+    configuration.output.customCompression = "some.compression.codec";
+
+    validation = validator.validateJob(MJob.Type.IMPORT, configuration);
+    assertEquals(Status.UNACCEPTABLE, validation.getStatus());
+    assertTrue(validation.getMessages().containsKey(new 
Validation.FormInput("output.compression")));
+
+    // specifying a customCompression is fine
+    configuration = new ImportJobConfiguration();
+    configuration.output.outputDirectory = "/czech/republic";
+    configuration.throttling.extractors = 2;
+    configuration.throttling.loaders = 2;
+    configuration.output.compression = OutputCompression.CUSTOM;
+    configuration.output.customCompression = "some.compression.codec";
+
+    validation = validator.validateJob(MJob.Type.IMPORT, configuration);
+    assertEquals(Status.FINE, validation.getStatus());
+
+    // specifying a customCompression without codec name is unacceptable
+    configuration = new ImportJobConfiguration();
+    configuration.output.outputDirectory = "/czech/republic";
+    configuration.throttling.extractors = 2;
+    configuration.throttling.loaders = 2;
+    configuration.output.compression = OutputCompression.CUSTOM;
+    configuration.output.customCompression = "";
+
+    validation = validator.validateJob(MJob.Type.IMPORT, configuration);
+    assertEquals(Status.UNACCEPTABLE, validation.getStatus());
+    assertTrue(validation.getMessages().containsKey(new 
Validation.FormInput("output.compression")));
+
+    configuration = new ImportJobConfiguration();
+    configuration.output.outputDirectory = "/czech/republic";
+    configuration.throttling.extractors = 2;
+    configuration.throttling.loaders = 2;
+    configuration.output.compression = OutputCompression.CUSTOM;
+    configuration.output.customCompression = null;
+
+    validation = validator.validateJob(MJob.Type.IMPORT, configuration);
+    assertEquals(Status.UNACCEPTABLE, validation.getStatus());
+    assertTrue(validation.getMessages().containsKey(new 
Validation.FormInput("output.compression")));
+
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/581f9b79/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git 
a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
 
b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index 392007d..5c0a027 100644
--- 
a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ 
b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -123,6 +123,8 @@ public class MapreduceExecutionEngine extends 
ExecutionEngine {
         return "org.apache.hadoop.io.compress.Lz4Codec";
       case SNAPPY:
         return "org.apache.hadoop.io.compress.SnappyCodec";
+      case CUSTOM:
+        return jobConf.output.customCompression.trim();
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/581f9b79/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java
----------------------------------------------------------------------
diff --git 
a/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java
 
b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java
index 19f5a22..39d1b53 100644
--- 
a/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java
+++ 
b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java
@@ -83,4 +83,30 @@ public class MapreduceExecutionEngineTest {
     assertEquals("Unexpected codec flag was returned", obtainedCodecFlag,
       expectedCompressionFlag);
   }
+
+  @Test
+  public void testCustomCompression() {
+    MapreduceExecutionEngine executionEngine = new MapreduceExecutionEngine();
+    final String customCodecName = "custom.compression";
+    SubmissionRequest request = executionEngine.createSubmissionRequest();
+    ImportJobConfiguration jobConf = new ImportJobConfiguration();
+    jobConf.output.outputFormat = OutputFormat.TEXT_FILE;
+    jobConf.output.compression = OutputCompression.CUSTOM;
+    jobConf.output.customCompression = customCodecName;
+    request.setConfigFrameworkJob(jobConf);
+    request.setConnectorCallbacks(new Importer(Initializer.class,
+      Partitioner.class, Extractor.class, Destroyer.class) {
+    });
+    executionEngine.prepareImportSubmission(request);
+
+    MutableMapContext context = request.getFrameworkContext();
+    final String obtainedCodecName = context.getString(
+      JobConstants.HADOOP_COMPRESS_CODEC);
+    final boolean obtainedCodecFlag =
+      context.getBoolean(JobConstants.HADOOP_COMPRESS, false);
+    assertEquals("Unexpected codec name was returned", obtainedCodecName,
+      customCodecName);
+    assertEquals("Unexpected codec flag was returned", obtainedCodecFlag, 
true);
+  }
+
 }

Reply via email to