This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8998bc3  [GOBBLIN-885] Fix orc-Compaction bug in non-dedup mode and 
add unit-test
8998bc3 is described below

commit 8998bc3111049509fede32bc146f774c1f5d658c
Author: autumnust <[email protected]>
AuthorDate: Mon Sep 23 10:16:28 2019 -0700

    [GOBBLIN-885] Fix orc-Compaction bug in non-dedup mode and add unit-test
    
    Closes #2738 from autumnust/dedupNullFix
---
 .../compaction/mapreduce/orc/OrcValueMapper.java   |  8 +-
 .../mapreduce/OrcCompactionTaskTest.java           | 94 ++++++++++++++++++----
 2 files changed, 80 insertions(+), 22 deletions(-)

diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
index a023977..c23ed9a 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
@@ -40,9 +40,8 @@ import org.apache.orc.mapreduce.OrcMapreduceRecordReader;
  * {@link RecordReader} with {@link NullWritable} as the key and generic type 
of value, the ORC Mapper will
  * read in the record as the input value.
  */
-public class OrcValueMapper extends RecordKeyMapperBase<NullWritable, 
OrcStruct, OrcKey, Object> {
+public class OrcValueMapper extends RecordKeyMapperBase<NullWritable, 
OrcStruct, Object, OrcValue> {
 
-  private OrcKey outKey;
   private OrcValue outValue;
   private TypeDescription mapperSchema;
 
@@ -50,7 +49,6 @@ public class OrcValueMapper extends 
RecordKeyMapperBase<NullWritable, OrcStruct,
   protected void setup(Context context)
       throws IOException, InterruptedException {
     super.setup(context);
-    this.outKey = new OrcKey();
     this.outValue = new OrcValue();
     this.mapperSchema = 
TypeDescription.fromString(context.getConfiguration().get(OrcConf.MAPRED_INPUT_SCHEMA.getAttribute()));
   }
@@ -60,8 +58,8 @@ public class OrcValueMapper extends 
RecordKeyMapperBase<NullWritable, OrcStruct,
       throws IOException, InterruptedException {
     OrcStruct upConvertedStruct = upConvertOrcStruct(orcStruct, context);
     if (context.getNumReduceTasks() == 0) {
-      this.outKey.key = upConvertedStruct;
-      context.write(this.outKey, NullWritable.get());
+      this.outValue.value = upConvertedStruct;
+      context.write(NullWritable.get(), this.outValue);
     } else {
       this.outValue.value = upConvertedStruct;
       context.write(getDedupKey(upConvertedStruct), this.outValue);
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
index da90579..f855790 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
@@ -22,6 +22,7 @@ import com.google.common.io.Files;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.gobblin.configuration.State;
@@ -50,20 +51,11 @@ import org.testng.annotations.Test;
 
 import static org.apache.gobblin.compaction.mapreduce.AvroCompactionTaskTest.*;
 import static 
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*;
+import static 
org.apache.gobblin.compaction.mapreduce.MRCompactor.COMPACTION_SHOULD_DEDUPLICATE;
 
 
 public class OrcCompactionTaskTest {
-
-  @Test
-  public void basicTest() throws Exception {
-    File basePath = Files.createTempDir();
-    basePath.deleteOnExit();
-
-    String minutelyPath = 
"Identity/MemberAccount/minutely/2017/04/03/10/20_30/run_2017-04-03-10-20";
-    String hourlyPath = "Identity/MemberAccount/hourly/2017/04/03/10/";
-    File jobDir = new File(basePath, minutelyPath);
-    Assert.assertTrue(jobDir.mkdirs());
-
+  private void createTestingData(File jobDir) throws Exception {
     // Write some ORC file for compaction here.
     TypeDescription schema = TypeDescription.fromString("struct<i:int,j:int>");
     OrcStruct orcStruct_0 = (OrcStruct) OrcStruct.createValue(schema);
@@ -82,24 +74,39 @@ public class OrcCompactionTaskTest {
     orcStruct_3.setFieldValue("i", new IntWritable(4));
     orcStruct_3.setFieldValue("j", new IntWritable(5));
 
-    // Writing a file with evolved schema.
+    File file_0 = new File(jobDir, "file_0");
+    File file_1 = new File(jobDir, "file_1");
+
+    writeOrcRecordsInFile(new Path(file_0.getAbsolutePath()), schema, 
ImmutableList.of(orcStruct_0, orcStruct_2));
+    writeOrcRecordsInFile(new Path(file_1.getAbsolutePath()), schema, 
ImmutableList.of(orcStruct_1, orcStruct_3));
+  }
+
+  @Test
+  public void basicTest() throws Exception {
+    File basePath = Files.createTempDir();
+    basePath.deleteOnExit();
+
+    String minutelyPath = 
"Identity/MemberAccount/minutely/2017/04/03/10/20_30/run_2017-04-03-10-20";
+    String hourlyPath = "Identity/MemberAccount/hourly/2017/04/03/10/";
+    File jobDir = new File(basePath, minutelyPath);
+    Assert.assertTrue(jobDir.mkdirs());
+
+    // Writing some basic ORC files
+    createTestingData(jobDir);
+
+    // Writing an additional file with evolved schema.
     TypeDescription evolvedSchema = 
TypeDescription.fromString("struct<i:int,j:int,k:int>");
     OrcStruct orcStruct_4 = (OrcStruct) OrcStruct.createValue(evolvedSchema);
     orcStruct_4.setFieldValue("i", new IntWritable(5));
     orcStruct_4.setFieldValue("j", new IntWritable(6));
     orcStruct_4.setFieldValue("k", new IntWritable(7));
 
-    File file_0 = new File(jobDir, "file_0");
-    File file_1 = new File(jobDir, "file_1");
     File file_2 = new File(jobDir, "file_2");
-    writeOrcRecordsInFile(new Path(file_0.getAbsolutePath()), schema, 
ImmutableList.of(orcStruct_0, orcStruct_2));
-    writeOrcRecordsInFile(new Path(file_1.getAbsolutePath()), schema, 
ImmutableList.of(orcStruct_1, orcStruct_3));
     writeOrcRecordsInFile(new Path(file_2.getAbsolutePath()), evolvedSchema, 
ImmutableList.of(orcStruct_4));
     // Make this is the newest.
     file_2.setLastModified(Long.MAX_VALUE);
 
     // Verify execution
-
     // Overwrite the job configurator factory key.
     String extensionFileName = "orcavro";
     EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin("basic", 
basePath.getAbsolutePath().toString())
@@ -139,6 +146,59 @@ public class OrcCompactionTaskTest {
     Assert.assertEquals(result.get(3).getFieldValue("k"), new IntWritable(7));
   }
 
+  @Test
+  public void testNonDedup() throws Exception {
+    File basePath = Files.createTempDir();
+    basePath.deleteOnExit();
+
+    String minutelyPath = 
"Identity/MemberAccount_2/minutely/2017/04/03/10/20_30/run_2017-04-03-10-20";
+    String hourlyPath = "Identity/MemberAccount_2/hourly/2017/04/03/10/";
+    File jobDir = new File(basePath, minutelyPath);
+    Assert.assertTrue(jobDir.mkdirs());
+
+    createTestingData(jobDir);
+
+    EmbeddedGobblin embeddedGobblin_nondedup = createEmbeddedGobblin("basic", 
basePath.getAbsolutePath().toString())
+        
.setConfiguration(CompactionJobConfigurator.COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
+            TestCompactionOrcJobConfigurator.Factory.class.getName())
+        .setConfiguration(COMPACTION_OUTPUT_EXTENSION, "orc")
+        .setConfiguration(COMPACTION_SHOULD_DEDUPLICATE, "false");
+    JobExecutionResult execution = embeddedGobblin_nondedup.run();
+    Assert.assertTrue(execution.isSuccessful());
+
+    // Non-dedup result verification
+    File outputDir = new File(basePath, hourlyPath);
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    List<FileStatus> statuses = new ArrayList<>();
+    for (FileStatus status : fs.listStatus(new 
Path(outputDir.getAbsolutePath()), new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return FilenameUtils.isExtension(path.getName(), "orc");
+      }
+    })) {
+      statuses.add(status);
+    }
+
+    Assert.assertTrue(statuses.size() == 1);
+    List<OrcStruct> result = readOrcFile(statuses.get(0).getPath());
+    Assert.assertEquals(result.size(), 4);
+
+    result.sort(new Comparator<OrcStruct>() {
+      @Override
+      public int compare(OrcStruct o1, OrcStruct o2) {
+        return o1.compareTo(o2);
+      }
+    });
+    Assert.assertEquals(result.get(0).getFieldValue("i"), new IntWritable(1));
+    Assert.assertEquals(result.get(0).getFieldValue("j"), new IntWritable(2));
+    Assert.assertEquals(result.get(1).getFieldValue("i"), new IntWritable(1));
+    Assert.assertEquals(result.get(1).getFieldValue("j"), new IntWritable(2));
+    Assert.assertEquals(result.get(2).getFieldValue("i"), new IntWritable(2));
+    Assert.assertEquals(result.get(2).getFieldValue("j"), new IntWritable(3));
+    Assert.assertEquals(result.get(3).getFieldValue("i"), new IntWritable(4));
+    Assert.assertEquals(result.get(3).getFieldValue("j"), new IntWritable(5));
+  }
+
   /**
    * Read a output ORC compacted file into memory.
    */

Reply via email to