This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8998bc3 [GOBBLIN-885] Fix orc-Compaction bug in non-dedup mode and
add unit-test
8998bc3 is described below
commit 8998bc3111049509fede32bc146f774c1f5d658c
Author: autumnust <[email protected]>
AuthorDate: Mon Sep 23 10:16:28 2019 -0700
[GOBBLIN-885] Fix orc-Compaction bug in non-dedup mode and add unit-test
Closes #2738 from autumnust/dedupNullFix
---
.../compaction/mapreduce/orc/OrcValueMapper.java | 8 +-
.../mapreduce/OrcCompactionTaskTest.java | 94 ++++++++++++++++++----
2 files changed, 80 insertions(+), 22 deletions(-)
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
index a023977..c23ed9a 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
@@ -40,9 +40,8 @@ import org.apache.orc.mapreduce.OrcMapreduceRecordReader;
* {@link RecordReader} with {@link NullWritable} as the key and generic type
of value, the ORC Mapper will
* read in the record as the input value.
*/
-public class OrcValueMapper extends RecordKeyMapperBase<NullWritable,
OrcStruct, OrcKey, Object> {
+public class OrcValueMapper extends RecordKeyMapperBase<NullWritable,
OrcStruct, Object, OrcValue> {
- private OrcKey outKey;
private OrcValue outValue;
private TypeDescription mapperSchema;
@@ -50,7 +49,6 @@ public class OrcValueMapper extends
RecordKeyMapperBase<NullWritable, OrcStruct,
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
- this.outKey = new OrcKey();
this.outValue = new OrcValue();
this.mapperSchema =
TypeDescription.fromString(context.getConfiguration().get(OrcConf.MAPRED_INPUT_SCHEMA.getAttribute()));
}
@@ -60,8 +58,8 @@ public class OrcValueMapper extends
RecordKeyMapperBase<NullWritable, OrcStruct,
throws IOException, InterruptedException {
OrcStruct upConvertedStruct = upConvertOrcStruct(orcStruct, context);
if (context.getNumReduceTasks() == 0) {
- this.outKey.key = upConvertedStruct;
- context.write(this.outKey, NullWritable.get());
+ this.outValue.value = upConvertedStruct;
+ context.write(NullWritable.get(), this.outValue);
} else {
this.outValue.value = upConvertedStruct;
context.write(getDedupKey(upConvertedStruct), this.outValue);
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
index da90579..f855790 100644
---
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
@@ -22,6 +22,7 @@ import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
import org.apache.commons.io.FilenameUtils;
import org.apache.gobblin.configuration.State;
@@ -50,20 +51,11 @@ import org.testng.annotations.Test;
import static org.apache.gobblin.compaction.mapreduce.AvroCompactionTaskTest.*;
import static
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*;
+import static
org.apache.gobblin.compaction.mapreduce.MRCompactor.COMPACTION_SHOULD_DEDUPLICATE;
public class OrcCompactionTaskTest {
-
- @Test
- public void basicTest() throws Exception {
- File basePath = Files.createTempDir();
- basePath.deleteOnExit();
-
- String minutelyPath =
"Identity/MemberAccount/minutely/2017/04/03/10/20_30/run_2017-04-03-10-20";
- String hourlyPath = "Identity/MemberAccount/hourly/2017/04/03/10/";
- File jobDir = new File(basePath, minutelyPath);
- Assert.assertTrue(jobDir.mkdirs());
-
+ private void createTestingData(File jobDir) throws Exception {
// Write some ORC file for compaction here.
TypeDescription schema = TypeDescription.fromString("struct<i:int,j:int>");
OrcStruct orcStruct_0 = (OrcStruct) OrcStruct.createValue(schema);
@@ -82,24 +74,39 @@ public class OrcCompactionTaskTest {
orcStruct_3.setFieldValue("i", new IntWritable(4));
orcStruct_3.setFieldValue("j", new IntWritable(5));
- // Writing a file with evolved schema.
+ File file_0 = new File(jobDir, "file_0");
+ File file_1 = new File(jobDir, "file_1");
+
+ writeOrcRecordsInFile(new Path(file_0.getAbsolutePath()), schema,
ImmutableList.of(orcStruct_0, orcStruct_2));
+ writeOrcRecordsInFile(new Path(file_1.getAbsolutePath()), schema,
ImmutableList.of(orcStruct_1, orcStruct_3));
+ }
+
+ @Test
+ public void basicTest() throws Exception {
+ File basePath = Files.createTempDir();
+ basePath.deleteOnExit();
+
+ String minutelyPath =
"Identity/MemberAccount/minutely/2017/04/03/10/20_30/run_2017-04-03-10-20";
+ String hourlyPath = "Identity/MemberAccount/hourly/2017/04/03/10/";
+ File jobDir = new File(basePath, minutelyPath);
+ Assert.assertTrue(jobDir.mkdirs());
+
+ // Writing some basic ORC files
+ createTestingData(jobDir);
+
+ // Writing an additional file with evolved schema.
TypeDescription evolvedSchema =
TypeDescription.fromString("struct<i:int,j:int,k:int>");
OrcStruct orcStruct_4 = (OrcStruct) OrcStruct.createValue(evolvedSchema);
orcStruct_4.setFieldValue("i", new IntWritable(5));
orcStruct_4.setFieldValue("j", new IntWritable(6));
orcStruct_4.setFieldValue("k", new IntWritable(7));
- File file_0 = new File(jobDir, "file_0");
- File file_1 = new File(jobDir, "file_1");
File file_2 = new File(jobDir, "file_2");
- writeOrcRecordsInFile(new Path(file_0.getAbsolutePath()), schema,
ImmutableList.of(orcStruct_0, orcStruct_2));
- writeOrcRecordsInFile(new Path(file_1.getAbsolutePath()), schema,
ImmutableList.of(orcStruct_1, orcStruct_3));
writeOrcRecordsInFile(new Path(file_2.getAbsolutePath()), evolvedSchema,
ImmutableList.of(orcStruct_4));
// Make this is the newest.
file_2.setLastModified(Long.MAX_VALUE);
// Verify execution
-
// Overwrite the job configurator factory key.
String extensionFileName = "orcavro";
EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin("basic",
basePath.getAbsolutePath().toString())
@@ -139,6 +146,59 @@ public class OrcCompactionTaskTest {
Assert.assertEquals(result.get(3).getFieldValue("k"), new IntWritable(7));
}
+ @Test
+ public void testNonDedup() throws Exception {
+ File basePath = Files.createTempDir();
+ basePath.deleteOnExit();
+
+ String minutelyPath =
"Identity/MemberAccount_2/minutely/2017/04/03/10/20_30/run_2017-04-03-10-20";
+ String hourlyPath = "Identity/MemberAccount_2/hourly/2017/04/03/10/";
+ File jobDir = new File(basePath, minutelyPath);
+ Assert.assertTrue(jobDir.mkdirs());
+
+ createTestingData(jobDir);
+
+ EmbeddedGobblin embeddedGobblin_nondedup = createEmbeddedGobblin("basic",
basePath.getAbsolutePath().toString())
+
.setConfiguration(CompactionJobConfigurator.COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
+ TestCompactionOrcJobConfigurator.Factory.class.getName())
+ .setConfiguration(COMPACTION_OUTPUT_EXTENSION, "orc")
+ .setConfiguration(COMPACTION_SHOULD_DEDUPLICATE, "false");
+ JobExecutionResult execution = embeddedGobblin_nondedup.run();
+ Assert.assertTrue(execution.isSuccessful());
+
+ // Non-dedup result verification
+ File outputDir = new File(basePath, hourlyPath);
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ List<FileStatus> statuses = new ArrayList<>();
+ for (FileStatus status : fs.listStatus(new
Path(outputDir.getAbsolutePath()), new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return FilenameUtils.isExtension(path.getName(), "orc");
+ }
+ })) {
+ statuses.add(status);
+ }
+
+ Assert.assertTrue(statuses.size() == 1);
+ List<OrcStruct> result = readOrcFile(statuses.get(0).getPath());
+ Assert.assertEquals(result.size(), 4);
+
+ result.sort(new Comparator<OrcStruct>() {
+ @Override
+ public int compare(OrcStruct o1, OrcStruct o2) {
+ return o1.compareTo(o2);
+ }
+ });
+ Assert.assertEquals(result.get(0).getFieldValue("i"), new IntWritable(1));
+ Assert.assertEquals(result.get(0).getFieldValue("j"), new IntWritable(2));
+ Assert.assertEquals(result.get(1).getFieldValue("i"), new IntWritable(1));
+ Assert.assertEquals(result.get(1).getFieldValue("j"), new IntWritable(2));
+ Assert.assertEquals(result.get(2).getFieldValue("i"), new IntWritable(2));
+ Assert.assertEquals(result.get(2).getFieldValue("j"), new IntWritable(3));
+ Assert.assertEquals(result.get(3).getFieldValue("i"), new IntWritable(4));
+ Assert.assertEquals(result.get(3).getFieldValue("j"), new IntWritable(5));
+ }
+
/**
* Read a output ORC compacted file into memory.
*/