This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 54bc6e6  [GOBBLIN-702] Compaction fix for reuse of OrcStruct
54bc6e6 is described below

commit 54bc6e69e66aa408914db6cd12da343c6acdc617
Author: autumnust <[email protected]>
AuthorDate: Fri Mar 15 22:05:55 2019 -0700

    [GOBBLIN-702] Compaction fix for reuse of OrcStruct
    
    Closes #2572 from autumnust/compactionRefactor
---
 .../compaction/mapreduce/orc/OrcKeyComparator.java | 13 +++++-----
 .../mapreduce/OrcCompactionTaskTest.java           | 29 +++++++++++++++++++---
 gobblin-modules/gobblin-orc-dep/build.gradle       |  1 +
 3 files changed, 33 insertions(+), 10 deletions(-)

diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyComparator.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyComparator.java
index efcd1f5..2699e7c 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyComparator.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyComparator.java
@@ -47,7 +47,8 @@ public class OrcKeyComparator extends Configured implements 
RawComparator<OrcKey
       // output from the map phase, so use the schema defined for the map 
output key
       // and the data model non-raw compare() implementation.
       schema = 
TypeDescription.fromString(conf.get(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute()));
-      OrcStruct orcRecordModel = (OrcStruct) OrcStruct.createValue(schema);
+      OrcStruct orcRecordModel1 = (OrcStruct) OrcStruct.createValue(schema);
+      OrcStruct orcRecordModel2 = (OrcStruct) OrcStruct.createValue(schema);
 
       if (key1 == null) {
         key1 = new OrcKey();
@@ -59,25 +60,25 @@ public class OrcKeyComparator extends Configured implements 
RawComparator<OrcKey
         buffer = new DataInputBuffer();
       }
 
-      key1.key = orcRecordModel;
-      key2.key = orcRecordModel;
+      key1.key = orcRecordModel1;
+      key2.key = orcRecordModel2;
     }
   }
 
   @Override
   public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
     try {
-      buffer.reset(b1, s1, l1);                   // parse key1
+      buffer.reset(b1, s1, l1);      // parse key1
       key1.readFields(buffer);
 
-      buffer.reset(b2, s2, l2);                   // parse key2
+      buffer.reset(b2, s2, l2);      // parse key2
       key2.readFields(buffer);
 
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
 
-    return compare(key1, key2);                   // compare them
+    return compare(key1, key2);     // compare them
   }
 
   @Override
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
index 5e86dd3..2d76549 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
@@ -74,10 +74,18 @@ public class OrcCompactionTaskTest {
     orcStruct_1.setFieldValue("i", new IntWritable(1));
     orcStruct_1.setFieldValue("j", new IntWritable(2));
 
+    OrcStruct orcStruct_2 = (OrcStruct) OrcStruct.createValue(schema);
+    orcStruct_2.setFieldValue("i", new IntWritable(2));
+    orcStruct_2.setFieldValue("j", new IntWritable(3));
+
+    OrcStruct orcStruct_3 = (OrcStruct) OrcStruct.createValue(schema);
+    orcStruct_3.setFieldValue("i", new IntWritable(4));
+    orcStruct_3.setFieldValue("j", new IntWritable(5));
+
     File file_0 = new File(jobDir, "file_0");
     File file_1 = new File(jobDir, "file_1");
-    writeOrcRecordsInFile(new Path(file_0.getAbsolutePath()), schema, 
ImmutableList.of(orcStruct_0));
-    writeOrcRecordsInFile(new Path(file_1.getAbsolutePath()), schema, 
ImmutableList.of(orcStruct_1));
+    writeOrcRecordsInFile(new Path(file_0.getAbsolutePath()), schema, 
ImmutableList.of(orcStruct_0, orcStruct_2));
+    writeOrcRecordsInFile(new Path(file_1.getAbsolutePath()), schema, 
ImmutableList.of(orcStruct_1, orcStruct_3));
 
     // Verify execution
 
@@ -105,9 +113,13 @@ public class OrcCompactionTaskTest {
 
     Assert.assertTrue(statuses.size() == 1);
     List<OrcStruct> result = readOrcFile(statuses.get(0).getPath());
-    Assert.assertEquals(result.size(), 1);
+    Assert.assertEquals(result.size(), 3);
     Assert.assertEquals(result.get(0).getFieldValue("i"), new IntWritable(1));
     Assert.assertEquals(result.get(0).getFieldValue("j"), new IntWritable(2));
+    Assert.assertEquals(result.get(1).getFieldValue("i"), new IntWritable(2));
+    Assert.assertEquals(result.get(1).getFieldValue("j"), new IntWritable(3));
+    Assert.assertEquals(result.get(2).getFieldValue("i"), new IntWritable(4));
+    Assert.assertEquals(result.get(2).getFieldValue("j"), new IntWritable(5));
   }
 
   /**
@@ -122,12 +134,21 @@ public class OrcCompactionTaskTest {
     List<OrcStruct> result = new ArrayList<>();
 
     while (recordReader.nextKeyValue()) {
-      result.add((OrcStruct) recordReader.getCurrentValue());
+      result.add(copyIntOrcStruct((OrcStruct) recordReader.getCurrentValue()));
     }
 
     return result;
   }
 
+  private OrcStruct copyIntOrcStruct(OrcStruct record) {
+    OrcStruct result = new OrcStruct(record.getSchema());
+    for (int i = 0 ; i < record.getNumFields() ; i ++ ) {
+      IntWritable newCopy = new IntWritable(((IntWritable) 
record.getFieldValue(i)).get());
+      result.setFieldValue(i, newCopy);
+    }
+    return result;
+  }
+
   public void writeOrcRecordsInFile(Path path, TypeDescription schema, 
List<OrcStruct> orcStructs) throws Exception {
     Configuration configuration = new Configuration();
     OrcFile.WriterOptions options = 
OrcFile.writerOptions(configuration).setSchema(schema);
diff --git a/gobblin-modules/gobblin-orc-dep/build.gradle 
b/gobblin-modules/gobblin-orc-dep/build.gradle
index 705eaaa..d611c0f 100644
--- a/gobblin-modules/gobblin-orc-dep/build.gradle
+++ b/gobblin-modules/gobblin-orc-dep/build.gradle
@@ -46,6 +46,7 @@ configurations {
     exclude group: "commons-cli"
     exclude group: "org.apache.avro"
     exclude group: "org.apache.httpcomponents"
+    exclude group: "org.apache.hadoop"
   }
 }
 

Reply via email to