This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 54bc6e6 [GOBBLIN-702] Compaction fix for reuse of OrcStruct
54bc6e6 is described below
commit 54bc6e69e66aa408914db6cd12da343c6acdc617
Author: autumnust <[email protected]>
AuthorDate: Fri Mar 15 22:05:55 2019 -0700
[GOBBLIN-702] Compaction fix for reuse of OrcStruct
Closes #2572 from autumnust/compactionRefactor
---
.../compaction/mapreduce/orc/OrcKeyComparator.java | 13 +++++-----
.../mapreduce/OrcCompactionTaskTest.java | 29 +++++++++++++++++++---
gobblin-modules/gobblin-orc-dep/build.gradle | 1 +
3 files changed, 33 insertions(+), 10 deletions(-)
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyComparator.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyComparator.java
index efcd1f5..2699e7c 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyComparator.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyComparator.java
@@ -47,7 +47,8 @@ public class OrcKeyComparator extends Configured implements
RawComparator<OrcKey
// output from the map phase, so use the schema defined for the map
output key
// and the data model non-raw compare() implementation.
schema =
TypeDescription.fromString(conf.get(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute()));
- OrcStruct orcRecordModel = (OrcStruct) OrcStruct.createValue(schema);
+ OrcStruct orcRecordModel1 = (OrcStruct) OrcStruct.createValue(schema);
+ OrcStruct orcRecordModel2 = (OrcStruct) OrcStruct.createValue(schema);
if (key1 == null) {
key1 = new OrcKey();
@@ -59,25 +60,25 @@ public class OrcKeyComparator extends Configured implements
RawComparator<OrcKey
buffer = new DataInputBuffer();
}
- key1.key = orcRecordModel;
- key2.key = orcRecordModel;
+ key1.key = orcRecordModel1;
+ key2.key = orcRecordModel2;
}
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
- buffer.reset(b1, s1, l1); // parse key1
+ buffer.reset(b1, s1, l1); // parse key1
key1.readFields(buffer);
- buffer.reset(b2, s2, l2); // parse key2
+ buffer.reset(b2, s2, l2); // parse key2
key2.readFields(buffer);
} catch (IOException e) {
throw new RuntimeException(e);
}
- return compare(key1, key2); // compare them
+ return compare(key1, key2); // compare them
}
@Override
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
index 5e86dd3..2d76549 100644
---
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
@@ -74,10 +74,18 @@ public class OrcCompactionTaskTest {
orcStruct_1.setFieldValue("i", new IntWritable(1));
orcStruct_1.setFieldValue("j", new IntWritable(2));
+ OrcStruct orcStruct_2 = (OrcStruct) OrcStruct.createValue(schema);
+ orcStruct_2.setFieldValue("i", new IntWritable(2));
+ orcStruct_2.setFieldValue("j", new IntWritable(3));
+
+ OrcStruct orcStruct_3 = (OrcStruct) OrcStruct.createValue(schema);
+ orcStruct_3.setFieldValue("i", new IntWritable(4));
+ orcStruct_3.setFieldValue("j", new IntWritable(5));
+
File file_0 = new File(jobDir, "file_0");
File file_1 = new File(jobDir, "file_1");
- writeOrcRecordsInFile(new Path(file_0.getAbsolutePath()), schema,
ImmutableList.of(orcStruct_0));
- writeOrcRecordsInFile(new Path(file_1.getAbsolutePath()), schema,
ImmutableList.of(orcStruct_1));
+ writeOrcRecordsInFile(new Path(file_0.getAbsolutePath()), schema,
ImmutableList.of(orcStruct_0, orcStruct_2));
+ writeOrcRecordsInFile(new Path(file_1.getAbsolutePath()), schema,
ImmutableList.of(orcStruct_1, orcStruct_3));
// Verify execution
@@ -105,9 +113,13 @@ public class OrcCompactionTaskTest {
Assert.assertTrue(statuses.size() == 1);
List<OrcStruct> result = readOrcFile(statuses.get(0).getPath());
- Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.size(), 3);
Assert.assertEquals(result.get(0).getFieldValue("i"), new IntWritable(1));
Assert.assertEquals(result.get(0).getFieldValue("j"), new IntWritable(2));
+ Assert.assertEquals(result.get(1).getFieldValue("i"), new IntWritable(2));
+ Assert.assertEquals(result.get(1).getFieldValue("j"), new IntWritable(3));
+ Assert.assertEquals(result.get(2).getFieldValue("i"), new IntWritable(4));
+ Assert.assertEquals(result.get(2).getFieldValue("j"), new IntWritable(5));
}
/**
@@ -122,12 +134,21 @@ public class OrcCompactionTaskTest {
List<OrcStruct> result = new ArrayList<>();
while (recordReader.nextKeyValue()) {
- result.add((OrcStruct) recordReader.getCurrentValue());
+ result.add(copyIntOrcStruct((OrcStruct) recordReader.getCurrentValue()));
}
return result;
}
+ private OrcStruct copyIntOrcStruct(OrcStruct record) {
+ OrcStruct result = new OrcStruct(record.getSchema());
+ for (int i = 0 ; i < record.getNumFields() ; i ++ ) {
+ IntWritable newCopy = new IntWritable(((IntWritable)
record.getFieldValue(i)).get());
+ result.setFieldValue(i, newCopy);
+ }
+ return result;
+ }
+
public void writeOrcRecordsInFile(Path path, TypeDescription schema,
List<OrcStruct> orcStructs) throws Exception {
Configuration configuration = new Configuration();
OrcFile.WriterOptions options =
OrcFile.writerOptions(configuration).setSchema(schema);
diff --git a/gobblin-modules/gobblin-orc-dep/build.gradle
b/gobblin-modules/gobblin-orc-dep/build.gradle
index 705eaaa..d611c0f 100644
--- a/gobblin-modules/gobblin-orc-dep/build.gradle
+++ b/gobblin-modules/gobblin-orc-dep/build.gradle
@@ -46,6 +46,7 @@ configurations {
exclude group: "commons-cli"
exclude group: "org.apache.avro"
exclude group: "org.apache.httpcomponents"
+ exclude group: "org.apache.hadoop"
}
}