This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f589c73 [GOBBLIN-699] Orc compaction impl.
f589c73 is described below
commit f589c732e298735039e4f2be8058bfdbbe04526b
Author: Lei Sun <[email protected]>
AuthorDate: Thu Mar 14 16:07:35 2019 -0700
[GOBBLIN-699] Orc compaction impl.
Closes #2570 from autumnust/orcCompactionImpl
---
.../gobblin/binary_creation/AvroTestTools.java | 1 -
.../gobblin/binary_creation/OrcTestTools.java | 2 +-
gobblin-compaction/build.gradle | 8 +-
.../CompactionCompleteFileOperationAction.java | 4 +-
.../compaction/event/CompactionSlaEventHelper.java | 25 +-
.../HiveMetadataForCompactionExtractor.java | 3 +-
.../HiveMetadataForCompactionExtractorFactory.java | 3 +-
.../mapreduce/CompactionAvroJobConfigurator.java | 6 +-
....java => CompactionCombineFileInputFormat.java} | 31 +-
.../mapreduce/CompactionJobConfigurator.java | 15 +-
.../mapreduce/CompactionOrcJobConfigurator.java | 80 +++++
...ommitter.java => CompactorOutputCommitter.java} | 33 +-
.../mapreduce/RecordKeyDedupReducerBase.java | 108 +++++++
.../compaction/mapreduce/RecordKeyMapperBase.java | 23 +-
.../avro/AvroKeyCompactorOutputFormat.java | 3 +-
.../mapreduce/avro/AvroKeyDedupReducer.java | 84 ++----
.../compaction/mapreduce/avro/AvroKeyMapper.java | 11 +-
.../AvroKeyRecursiveCombineFileInputFormat.java | 69 +----
.../mapreduce/orc/OrcKeyCompactorOutputFormat.java | 70 +++++
.../compaction/mapreduce/orc/OrcKeyComparator.java | 90 ++++++
.../mapreduce/orc/OrcKeyDedupReducer.java | 50 +++
.../gobblin/compaction/mapreduce/orc/OrcUtils.java | 72 +++++
.../orc/OrcValueCombineFileInputFormat.java | 37 +++
.../orc/OrcValueCombineFileRecordReader.java | 64 ++++
.../compaction/mapreduce/orc/OrcValueMapper.java | 77 +++++
...onTaskTest.java => AvroCompactionTaskTest.java} | 8 +-
...upReducerTest.java => KeyDedupReducerTest.java} | 37 +--
.../mapreduce/OrcCompactionTaskTest.java | 160 ++++++++++
.../mapreduce/orc/OrcKeyComparatorTest.java | 334 +++++++++++++++++++++
gobblin-modules/gobblin-orc-dep/build.gradle | 57 ++++
.../recordcount/CompactionRecordCountProvider.java | 6 +-
.../CompactionRecordCountProviderTest.java | 2 +-
32 files changed, 1324 insertions(+), 249 deletions(-)
diff --git
a/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/AvroTestTools.java
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/AvroTestTools.java
index 9a7bb1d..7943cc6 100644
---
a/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/AvroTestTools.java
+++
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/AvroTestTools.java
@@ -29,7 +29,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.Spliterators;
import java.util.TreeMap;
-import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
diff --git
a/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
index 465fb89..e0d02cd 100644
---
a/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
+++
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
@@ -387,7 +387,7 @@ public class OrcTestTools extends
DataTestTools<OrcTestTools.OrcRowIterator, Typ
}
/**
- * An iterator over {@link GenericRecord} which is also aware of schema.
+ * An iterator over {@link OrcStruct} which is also aware of schema(
Represented in {@link TypeInfo}).
*/
@AllArgsConstructor
public static class OrcRowIterator implements Iterator<Writable> {
diff --git a/gobblin-compaction/build.gradle b/gobblin-compaction/build.gradle
index d6446d1..88bc161 100644
--- a/gobblin-compaction/build.gradle
+++ b/gobblin-compaction/build.gradle
@@ -27,13 +27,16 @@ dependencies {
compile project(":gobblin-runtime")
compile project(":gobblin-modules:gobblin-kafka-common")
+ // Given orc-mapreduce depends on hive version of hive-storage-api(2.4.0)
and conflicted
+ // with hive-exec-core in older version(1.0.1), we need to shadow
ord-mapreduce's transitive deps.
+ compile project(path: ":gobblin-modules:gobblin-orc-dep",
configuration:"shadow")
+
compile externalDependency.calciteCore
compile externalDependency.calciteAvatica
compile externalDependency.jhyde
compile externalDependency.avro
compile externalDependency.commonsLang
compile externalDependency.commonsMath
- compile externalDependency.hiveExec
compile externalDependency.mockito
compile externalDependency.testng
@@ -58,7 +61,8 @@ dependencies {
configurations {
- compile { transitive = true }
+ compile { transitive = true }
+ all*.exclude group: "org.apache.hadoop", module: 'hive-exec'
}
ext.classification="library"
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index 8dc6324..eef8474 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -28,7 +28,7 @@ import
org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner;
-import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyMapper;
+import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
import org.apache.gobblin.configuration.State;
@@ -130,7 +130,7 @@ public class CompactionCompleteFileOperationAction
implements CompactionComplete
// We don't get record count from file name because tracking which
files are actually involved in the MR execution can
// be hard. This is due to new minutely data is rolled up to hourly
folder but from daily compaction perspective we are not
// able to tell which file are newly added (because we simply pass all
hourly folders to MR job instead of individual files).
- Counter counter =
job.getCounters().findCounter(AvroKeyMapper.EVENT_COUNTER.RECORD_COUNT);
+ Counter counter =
job.getCounters().findCounter(RecordKeyMapperBase.EVENT_COUNTER.RECORD_COUNT);
newTotalRecords = counter.getValue();
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
index 042c5a4..763f375 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
@@ -17,8 +17,16 @@
package org.apache.gobblin.compaction.event;
+import com.google.common.base.Optional;
import java.io.IOException;
-
+import org.apache.gobblin.compaction.dataset.Dataset;
+import org.apache.gobblin.compaction.mapreduce.MRCompactor;
+import org.apache.gobblin.compaction.mapreduce.RecordKeyDedupReducerBase;
+import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.event.sla.SlaEventKeys;
+import org.apache.gobblin.metrics.event.sla.SlaEventSubmitter;
+import
org.apache.gobblin.metrics.event.sla.SlaEventSubmitter.SlaEventSubmitterBuilder;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counter;
@@ -27,17 +35,6 @@ import org.apache.hadoop.mapreduce.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Optional;
-
-import org.apache.gobblin.compaction.dataset.Dataset;
-import org.apache.gobblin.compaction.mapreduce.MRCompactor;
-import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyDedupReducer;
-import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyMapper;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.metrics.event.sla.SlaEventKeys;
-import org.apache.gobblin.metrics.event.sla.SlaEventSubmitter;
-import
org.apache.gobblin.metrics.event.sla.SlaEventSubmitter.SlaEventSubmitterBuilder;
-
/**
* Helper class to build compaction sla event metadata.
@@ -142,13 +139,13 @@ public class CompactionSlaEventHelper {
return -1l;
}
- Counter recordCounter =
counters.findCounter(AvroKeyDedupReducer.EVENT_COUNTER.RECORD_COUNT);
+ Counter recordCounter =
counters.findCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.RECORD_COUNT);
if (recordCounter != null && recordCounter.getValue() != 0) {
return recordCounter.getValue();
}
- recordCounter =
counters.findCounter(AvroKeyMapper.EVENT_COUNTER.RECORD_COUNT);
+ recordCounter =
counters.findCounter(RecordKeyMapperBase.EVENT_COUNTER.RECORD_COUNT);
if (recordCounter != null && recordCounter.getValue() != 0) {
return recordCounter.getValue();
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java
index 020da2d..a125479 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.thrift.TException;
import com.google.common.base.Splitter;
@@ -52,7 +51,7 @@ public class HiveMetadataForCompactionExtractor extends
HiveBaseExtractor<Void,
private MRCompactionEntity compactionEntity;
private boolean extracted = false;
- public HiveMetadataForCompactionExtractor(WorkUnitState state, FileSystem
fs) throws IOException, TException, HiveException {
+ public HiveMetadataForCompactionExtractor(WorkUnitState state, FileSystem
fs) throws IOException, TException {
super(state);
if
(Boolean.valueOf(state.getPropAsBoolean(PartitionLevelWatermarker.IS_WATERMARK_WORKUNIT_KEY)))
{
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java
index ab89df5..9f71b8b 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.compaction.hivebasedconstructs;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.thrift.TException;
import org.apache.gobblin.configuration.WorkUnitState;
@@ -33,7 +32,7 @@ import
org.apache.gobblin.data.management.conversion.hive.extractor.HiveBaseExtr
*/
public class HiveMetadataForCompactionExtractorFactory implements
HiveBaseExtractorFactory {
public HiveBaseExtractor createExtractor(WorkUnitState state, FileSystem
sourceFs)
- throws IOException, TException, HiveException {
+ throws IOException, TException {
return new HiveMetadataForCompactionExtractor(state, sourceFs);
}
}
\ No newline at end of file
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
index 14d23d5..65fdce5 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
@@ -30,9 +30,7 @@ import
org.apache.gobblin.compaction.mapreduce.avro.AvroKeyDedupReducer;
import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyMapper;
import
org.apache.gobblin.compaction.mapreduce.avro.AvroKeyRecursiveCombineFileInputFormat;
import
org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
-import org.apache.gobblin.compaction.suite.CompactionSuiteBase;
import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.dataset.Dataset;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
import org.apache.gobblin.util.AvroUtils;
@@ -85,7 +83,7 @@ public class CompactionAvroJobConfigurator extends
CompactionJobConfigurator {
/**
* Refer to MRCompactorAvroKeyDedupJobRunner#getKeySchema(Job, Schema)
*/
- private Schema getKeySchema(Job job, Schema topicSchema) throws IOException {
+ private Schema getDedupKeySchema(Schema topicSchema) {
boolean keySchemaFileSpecified =
this.state.contains(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC);
@@ -133,7 +131,7 @@ public class CompactionAvroJobConfigurator extends
CompactionJobConfigurator {
if
(this.state.getPropAsBoolean(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_SINGLE_INPUT_SCHEMA,
true)) {
AvroJob.setInputKeySchema(job, newestSchema);
}
- AvroJob.setMapOutputKeySchema(job, this.shouldDeduplicate ?
getKeySchema(job, newestSchema) : newestSchema);
+ AvroJob.setMapOutputKeySchema(job, this.shouldDeduplicate ?
getDedupKeySchema(newestSchema) : newestSchema);
AvroJob.setMapOutputValueSchema(job, newestSchema);
AvroJob.setOutputKeySchema(job, newestSchema);
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionCombineFileInputFormat.java
similarity index 76%
copy from
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
copy to
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionCombineFileInputFormat.java
index 5f864cb..6b9bfc4 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionCombineFileInputFormat.java
@@ -15,40 +15,23 @@
* limitations under the License.
*/
-package org.apache.gobblin.compaction.mapreduce.avro;
+package org.apache.gobblin.compaction.mapreduce;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.VersionInfo;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-
-/**
- * A subclass of {@link
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat} for Avro
inputfiles.
- * This class is able to handle the case where the input path has subdirs
which contain data files, which
- * is not the case with {@link
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat}.
- *
- * @author Ziyang Liu
- */
-public class AvroKeyRecursiveCombineFileInputFormat
- extends CombineFileInputFormat<AvroKey<GenericRecord>, NullWritable> {
+public abstract class CompactionCombineFileInputFormat<KI, KO> extends
CombineFileInputFormat<KI, KO> {
private static final String COMPACTION_JOB_PREFIX = "compaction.job.";
/**
@@ -103,10 +86,4 @@ public class AvroKeyRecursiveCombineFileInputFormat
}
return cleanedSplits;
}
-
- @Override
- public RecordReader<AvroKey<GenericRecord>, NullWritable>
createRecordReader(InputSplit split, TaskAttemptContext cx)
- throws IOException {
- return new CombineFileRecordReader<>((CombineFileSplit) split, cx,
AvroKeyCombineFileRecordReader.class);
- }
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
index d88ad95..a96a99b 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
@@ -17,8 +17,6 @@
package org.apache.gobblin.compaction.mapreduce;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import java.io.IOException;
@@ -67,7 +65,6 @@ public abstract class CompactionJobConfigurator {
public static final String DEFAULT_COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS
=
"org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator$Factory";
-
@Getter
@AllArgsConstructor
protected enum EXTENSION {
@@ -119,7 +116,6 @@ public abstract class CompactionJobConfigurator {
}
public abstract String getFileExtension();
-
/**
* Customized MR job creation for Avro.
*
@@ -186,6 +182,7 @@ public abstract class CompactionJobConfigurator {
/**
* Refer to {@link MRCompactorAvroKeyDedupJobRunner#setNumberOfReducers(Job)}
+ * Note that this method is not format specific.
*/
protected void setNumberOfReducers(Job job) throws IOException {
@@ -337,13 +334,15 @@ public abstract class CompactionJobConfigurator {
}
private static List<TaskCompletionEvent>
getUnsuccessfulTaskCompletionEvent(Job completedJob) {
- return
getAllTaskCompletionEvent(completedJob).stream().filter(te->te.getStatus() !=
TaskCompletionEvent.Status.SUCCEEDED).collect(
- Collectors.toList());
+ return getAllTaskCompletionEvent(completedJob).stream()
+ .filter(te -> te.getStatus() != TaskCompletionEvent.Status.SUCCEEDED)
+ .collect(Collectors.toList());
}
private static boolean isFailedPath(Path path, List<TaskCompletionEvent>
failedEvents) {
return path.toString().contains("_temporary") || failedEvents.stream()
- .anyMatch(event -> path.toString().contains(Path.SEPARATOR +
event.getTaskAttemptId().toString() + Path.SEPARATOR));
+ .anyMatch(
+ event -> path.toString().contains(Path.SEPARATOR +
event.getTaskAttemptId().toString() + Path.SEPARATOR));
}
/**
@@ -364,7 +363,7 @@ public abstract class CompactionJobConfigurator {
List<Path> allFilePaths = DatasetHelper.getApplicableFilePaths(fs,
tmpPath, acceptableExtension);
List<Path> goodPaths = new ArrayList<>();
- for (Path filePath: allFilePaths) {
+ for (Path filePath : allFilePaths) {
if (isFailedPath(filePath, failedEvents)) {
fs.delete(filePath, false);
log.error("{} is a bad path so it was deleted", filePath);
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
new file mode 100644
index 0000000..ba56047
--- /dev/null
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce;
+
+import java.io.IOException;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcKeyCompactorOutputFormat;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcKeyComparator;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcKeyDedupReducer;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcUtils;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcValueMapper;
+import
org.apache.gobblin.compaction.mapreduce.orc.OrcValueCombineFileInputFormat;
+import org.apache.gobblin.configuration.State;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.orc.OrcConf;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcValue;
+
+import static
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*;
+
+
+public class CompactionOrcJobConfigurator extends CompactionJobConfigurator {
+ public static class Factory implements
CompactionJobConfigurator.ConfiguratorFactory {
+ @Override
+ public CompactionJobConfigurator createConfigurator(State state) throws
IOException {
+ return new CompactionOrcJobConfigurator(state);
+ }
+ }
+
+ public CompactionOrcJobConfigurator(State state) throws IOException {
+ super(state);
+ }
+
+ @Override
+ public String getFileExtension() {
+ return this.state.getProp(COMPACTION_OUTPUT_EXTENSION,
EXTENSION.ORC.getExtensionString());
+ }
+
+ protected void configureSchema(Job job) throws IOException {
+ TypeDescription schema = OrcUtils.getNewestSchemaFromSource(job, this.fs);
+
+ job.getConfiguration().set(OrcConf.MAPRED_INPUT_SCHEMA.getAttribute(),
schema.toString());
+
job.getConfiguration().set(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute(),
schema.toString());
+
job.getConfiguration().set(OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.getAttribute(),
schema.toString());
+ job.getConfiguration().set(OrcConf.MAPRED_OUTPUT_SCHEMA.getAttribute(),
schema.toString());
+ }
+
+ protected void configureMapper(Job job) {
+ job.setInputFormatClass(OrcValueCombineFileInputFormat.class);
+ job.setMapperClass(OrcValueMapper.class);
+ job.setMapOutputKeyClass(OrcKey.class);
+ job.setMapOutputValueClass(OrcValue.class);
+ job.setGroupingComparatorClass(OrcKeyComparator.class);
+ job.setSortComparatorClass(OrcKeyComparator.class);
+ }
+
+ protected void configureReducer(Job job) throws IOException {
+ job.setReducerClass(OrcKeyDedupReducer.class);
+ job.setOutputFormatClass(OrcKeyCompactorOutputFormat.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(OrcValue.class);
+ setNumberOfReducers(job);
+ }
+}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCompactorOutputCommitter.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactorOutputCommitter.java
similarity index 73%
rename from
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCompactorOutputCommitter.java
rename to
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactorOutputCommitter.java
index cca9d4f..5968095 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCompactorOutputCommitter.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactorOutputCommitter.java
@@ -15,12 +15,13 @@
* limitations under the License.
*/
-package org.apache.gobblin.compaction.mapreduce.avro;
+package org.apache.gobblin.compaction.mapreduce;
import java.io.IOException;
import java.lang.reflect.Method;
-
import org.apache.commons.io.FilenameUtils;
+import
org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
+import org.apache.gobblin.util.recordcount.CompactionRecordCountProvider;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -31,22 +32,29 @@ import
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.gobblin.util.recordcount.CompactionRecordCountProvider;
-
/**
* Class used with {@link MRCompactorAvroKeyDedupJobRunner} to rename files as
they
* are being committed. In addition to moving files from their working
directory to
* the commit output directory, the files are named to include a timestamp and
a
* count of how many records the file contains, in the format
- * {recordCount}.{timestamp}.avro.
+ * {recordCount}.{timestamp}.<extensionName>(avro, orc, etc.).
*/
-public class AvroKeyCompactorOutputCommitter extends FileOutputCommitter {
+public class CompactorOutputCommitter extends FileOutputCommitter {
+ /**
+ * Note that the value of this key doesn't have dot.
+ */
+ public static final String COMPACTION_OUTPUT_EXTENSION =
"compaction.output.extension";
+ public static final String DEFAULT_COMPACTION_OUTPUT_EXTENSION = "avro";
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactorOutputCommitter.class);
- private static final Logger LOG =
LoggerFactory.getLogger(AvroKeyCompactorOutputCommitter.class);
+ private final String compactionFileExtension;
- public AvroKeyCompactorOutputCommitter(Path output, TaskAttemptContext
context) throws IOException {
+ public CompactorOutputCommitter(Path output, TaskAttemptContext context)
throws IOException {
super(output, context);
+ compactionFileExtension =
context.getConfiguration().get(COMPACTION_OUTPUT_EXTENSION,
+ DEFAULT_COMPACTION_OUTPUT_EXTENSION);
}
/**
@@ -62,23 +70,24 @@ public class AvroKeyCompactorOutputCommitter extends
FileOutputCommitter {
FileSystem fs = workPath.getFileSystem(context.getConfiguration());
if (fs.exists(workPath)) {
- long recordCount = getRecordCountFromCounter(context,
AvroKeyDedupReducer.EVENT_COUNTER.RECORD_COUNT);
+ long recordCount = getRecordCountFromCounter(context,
RecordKeyDedupReducerBase.EVENT_COUNTER.RECORD_COUNT);
String fileNamePrefix;
if (recordCount == 0) {
// recordCount == 0 indicates that it is a map-only, non-dedup job,
and thus record count should
// be obtained from mapper counter.
fileNamePrefix = CompactionRecordCountProvider.M_OUTPUT_FILE_PREFIX;
- recordCount = getRecordCountFromCounter(context,
AvroKeyMapper.EVENT_COUNTER.RECORD_COUNT);
+ recordCount = getRecordCountFromCounter(context,
RecordKeyMapperBase.EVENT_COUNTER.RECORD_COUNT);
} else {
fileNamePrefix = CompactionRecordCountProvider.MR_OUTPUT_FILE_PREFIX;
}
- String fileName =
CompactionRecordCountProvider.constructFileName(fileNamePrefix, recordCount);
+ String fileName =
CompactionRecordCountProvider.constructFileName(fileNamePrefix,
+ "." + compactionFileExtension, recordCount);
for (FileStatus status : fs.listStatus(workPath, new PathFilter() {
@Override
public boolean accept(Path path) {
- return FilenameUtils.isExtension(path.getName(), "avro");
+ return FilenameUtils.isExtension(path.getName(),
compactionFileExtension);
}
})) {
Path newPath = new Path(status.getPath().getParent(), fileName);
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/RecordKeyDedupReducerBase.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/RecordKeyDedupReducerBase.java
new file mode 100644
index 0000000..adcb603
--- /dev/null
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/RecordKeyDedupReducerBase.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce;
+
+import com.google.common.base.Optional;
+import java.io.IOException;
+import java.util.Comparator;
+import lombok.Getter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+
+/**
+ * A base implementation of deduplication reducer that is format-unaware.
+ */
+public abstract class RecordKeyDedupReducerBase<KI, VI, KO, VO> extends
Reducer<KI, VI, KO, VO> {
+ public enum EVENT_COUNTER {
+ MORE_THAN_1,
+ DEDUPED,
+ RECORD_COUNT
+ }
+
+ /**
+ * In most of cases, one of following will be {@link NullWritable}
+ */
+ @Getter
+ protected KO outKey;
+
+ @Getter
+ protected VO outValue;
+
+
+ protected Optional<Comparator<VI>> deltaComparatorOptional;
+
+
+ protected abstract void initReusableObject();
+
+ /**
+ * Assign output value to reusable object.
+ * @param valueToRetain the output value determined after dedup process.
+ */
+ protected abstract void setOutKey(VI valueToRetain);
+
+ /**
+ * Added to avoid loss of flexibility to put output value in key/value.
+ * Usually for compaction job, either implement {@link #setOutKey} or this.
+ */
+ protected abstract void setOutValue(VI valueToRetain);
+
+ protected abstract void initDeltaComparator(Configuration conf);
+
+
+ @Override
+ protected void setup(Context context) {
+ initReusableObject();
+ initDeltaComparator(context.getConfiguration());
+ }
+
+ @Override
+ protected void reduce(KI key, Iterable<VI> values, Context context)
+ throws IOException, InterruptedException {
+ int numVals = 0;
+
+ VI valueToRetain = null;
+
+ for (VI value : values) {
+ if (valueToRetain == null) {
+ valueToRetain = value;
+ } else if (deltaComparatorOptional.isPresent()) {
+ valueToRetain = deltaComparatorOptional.get().compare(valueToRetain,
value) >= 0 ? valueToRetain : value;
+ }
+ numVals++;
+ }
+
+ setOutKey(valueToRetain);
+ setOutValue(valueToRetain);
+
+ if (numVals > 1) {
+ context.getCounter(EVENT_COUNTER.MORE_THAN_1).increment(1);
+ context.getCounter(EVENT_COUNTER.DEDUPED).increment(numVals - 1);
+ }
+
+ context.getCounter(EVENT_COUNTER.RECORD_COUNT).increment(1);
+
+ // Safety check
+ if (outKey == null || outValue == null) {
+ throw new IllegalStateException("Either outKey or outValue is not being
properly initialized");
+ }
+
+ context.write(this.outKey, this.outValue);
+ }
+}
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProviderTest.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/RecordKeyMapperBase.java
similarity index 50%
copy from
gobblin-utility/src/test/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProviderTest.java
copy to
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/RecordKeyMapperBase.java
index d78fc79..1f71942 100644
---
a/gobblin-utility/src/test/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProviderTest.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/RecordKeyMapperBase.java
@@ -15,26 +15,13 @@
* limitations under the License.
*/
-package org.apache.gobblin.util.recordcount;
+package org.apache.gobblin.compaction.mapreduce;
-import java.util.regex.Pattern;
+import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-/**
- * Tests for {@link CompactionRecordCountProvider}.
- */
-@Test(groups = { "gobblin.util.recordcount" })
-public class CompactionRecordCountProviderTest {
- @Test
- public void testFileNameRecordCountProvider() {
- CompactionRecordCountProvider filenameRecordCountProvider = new
CompactionRecordCountProvider();
-
- Pattern pattern =
Pattern.compile("part\\-r\\-123\\.[\\d]*\\.[\\d]*\\.avro");
-
Assert.assertTrue(pattern.matcher(CompactionRecordCountProvider.constructFileName("part-r-",
123)).matches());
- Assert.assertEquals(filenameRecordCountProvider.getRecordCount(new
Path("part-r-123.1.2.avro")), 123);
+public abstract class RecordKeyMapperBase<KI, VI, KO, VO> extends Mapper<KI,
VI, KO, VO> {
+ public enum EVENT_COUNTER {
+ RECORD_COUNT
}
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCompactorOutputFormat.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCompactorOutputFormat.java
index f2d5c00..8a03aa5 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCompactorOutputFormat.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCompactorOutputFormat.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.compaction.mapreduce.avro;
import java.io.IOException;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
+import org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
@@ -38,7 +39,7 @@ public class AvroKeyCompactorOutputFormat<T> extends
AvroKeyOutputFormat<T> {
@Override
public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext
context) throws IOException {
if (this.committer == null) {
- this.committer = new
AvroKeyCompactorOutputCommitter(FileOutputFormat.getOutputPath(context),
context);
+ this.committer = new
CompactorOutputCommitter(FileOutputFormat.getOutputPath(context), context);
}
return this.committer;
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyDedupReducer.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyDedupReducer.java
index 130e544..81f2ca5 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyDedupReducer.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyDedupReducer.java
@@ -17,20 +17,16 @@
package org.apache.gobblin.compaction.mapreduce.avro;
-import java.io.IOException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
import java.util.Comparator;
-
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
+import org.apache.gobblin.compaction.mapreduce.RecordKeyDedupReducerBase;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Reducer;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
/**
@@ -40,61 +36,40 @@ import
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
*
* @author Ziyang Liu
*/
-public class AvroKeyDedupReducer extends Reducer<AvroKey<GenericRecord>,
AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
-
- public enum EVENT_COUNTER {
- MORE_THAN_1,
- DEDUPED,
- RECORD_COUNT
- }
+public class AvroKeyDedupReducer extends
RecordKeyDedupReducerBase<AvroKey<GenericRecord>, AvroValue<GenericRecord>,
+ AvroKey<GenericRecord>, NullWritable> {
public static final String DELTA_SCHEMA_PROVIDER =
"org.apache.gobblin.compaction." +
AvroKeyDedupReducer.class.getSimpleName() + ".deltaFieldsProvider";
- private AvroKey<GenericRecord> outKey;
- private Optional<AvroValueDeltaSchemaComparator> deltaComparatorOptional;
- private AvroDeltaFieldNameProvider deltaFieldNamesProvider;
@Override
- protected void setup(Context context)
- throws IOException, InterruptedException {
- this.outKey = new AvroKey<>();
- this.deltaComparatorOptional = Optional.absent();
- Configuration conf = context.getConfiguration();
- String deltaSchemaProviderClassName = conf.get(DELTA_SCHEMA_PROVIDER);
- if (deltaSchemaProviderClassName != null) {
- this.deltaFieldNamesProvider =
-
GobblinConstructorUtils.invokeConstructor(AvroDeltaFieldNameProvider.class,
deltaSchemaProviderClassName, conf);
- this.deltaComparatorOptional = Optional.of(new
AvroValueDeltaSchemaComparator(deltaFieldNamesProvider));
- }
+ protected void initReusableObject() {
+ outKey = new AvroKey<>();
+ outValue = NullWritable.get();
}
@Override
- protected void reduce(AvroKey<GenericRecord> key,
Iterable<AvroValue<GenericRecord>> values, Context context)
- throws IOException, InterruptedException {
- int numVals = 0;
-
- AvroValue<GenericRecord> valueToRetain = null;
+ protected void setOutKey(AvroValue<GenericRecord> valueToRetain) {
+ outKey.datum(valueToRetain.datum());
+ }
- for (AvroValue<GenericRecord> value : values) {
- if (valueToRetain == null) {
- valueToRetain = value;
- } else if (this.deltaComparatorOptional.isPresent()) {
- valueToRetain =
this.deltaComparatorOptional.get().compare(valueToRetain, value) >= 0 ?
valueToRetain : value;
- }
- numVals++;
- }
- this.outKey.datum(valueToRetain.datum());
+ @Override
+ protected void setOutValue(AvroValue<GenericRecord> valueToRetain) {
+ // do nothing since initReusableObject has assigned value for outValue.
+ }
- if (numVals > 1) {
- context.getCounter(EVENT_COUNTER.MORE_THAN_1).increment(1);
- context.getCounter(EVENT_COUNTER.DEDUPED).increment(numVals - 1);
+ @Override
+ protected void initDeltaComparator(Configuration conf) {
+ deltaComparatorOptional = Optional.absent();
+ String deltaSchemaProviderClassName = conf.get(DELTA_SCHEMA_PROVIDER);
+ if (deltaSchemaProviderClassName != null) {
+ deltaComparatorOptional = Optional.of(new AvroValueDeltaSchemaComparator(
+
GobblinConstructorUtils.invokeConstructor(AvroDeltaFieldNameProvider.class,
deltaSchemaProviderClassName,
+ conf)));
}
-
- context.getCounter(EVENT_COUNTER.RECORD_COUNT).increment(1);
-
- context.write(this.outKey, NullWritable.get());
}
+
@VisibleForTesting
protected static class AvroValueDeltaSchemaComparator implements
Comparator<AvroValue<GenericRecord>> {
private final AvroDeltaFieldNameProvider deltaSchemaProvider;
@@ -105,21 +80,16 @@ public class AvroKeyDedupReducer extends
Reducer<AvroKey<GenericRecord>, AvroVal
@Override
public int compare(AvroValue<GenericRecord> o1, AvroValue<GenericRecord>
o2) {
- GenericRecord record1= o1.datum();
+ GenericRecord record1 = o1.datum();
GenericRecord record2 = o2.datum();
for (String deltaFieldName :
this.deltaSchemaProvider.getDeltaFieldNames(record1)) {
if (record1.get(deltaFieldName).equals(record2.get(deltaFieldName))) {
continue;
}
- return
((Comparable)record1.get(deltaFieldName)).compareTo(record2.get(deltaFieldName));
+ return ((Comparable)
record1.get(deltaFieldName)).compareTo(record2.get(deltaFieldName));
}
return 0;
}
}
-
- @VisibleForTesting
- protected AvroKey<GenericRecord> getOutKey() {
- return this.outKey;
- }
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyMapper.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyMapper.java
index 6f16d33..e4edc82 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyMapper.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyMapper.java
@@ -28,6 +28,7 @@ import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -44,12 +45,8 @@ import
org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
*
* @author Ziyang Liu
*/
-public class AvroKeyMapper extends Mapper<AvroKey<GenericRecord>,
NullWritable, AvroKey<GenericRecord>, Object> {
-
- public enum EVENT_COUNTER {
- RECORD_COUNT
- }
-
+public class AvroKeyMapper extends
+ RecordKeyMapperBase<AvroKey<GenericRecord>,
NullWritable, AvroKey<GenericRecord>, Object> {
private AvroKey<GenericRecord> outKey;
private AvroValue<GenericRecord> outValue;
private Schema keySchema;
@@ -86,7 +83,7 @@ public class AvroKeyMapper extends
Mapper<AvroKey<GenericRecord>, NullWritable,
* Target record's schema cannot have MAP, ARRAY or ENUM fields, or UNION
fields that
* contain these fields.
*/
- private static void populateComparableKeyRecord(GenericRecord source,
GenericRecord target) {
+ private void populateComparableKeyRecord(GenericRecord source, GenericRecord
target) {
for (Field field : target.getSchema().getFields()) {
if (field.schema().getType() == Schema.Type.UNION) {
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
index 5f864cb..30de419 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
@@ -18,25 +18,15 @@
package org.apache.gobblin.compaction.mapreduce.avro;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
+import
org.apache.gobblin.compaction.mapreduce.CompactionCombineFileInputFormat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.util.VersionInfo;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
/**
@@ -47,62 +37,7 @@ import com.google.common.collect.Lists;
* @author Ziyang Liu
*/
public class AvroKeyRecursiveCombineFileInputFormat
- extends CombineFileInputFormat<AvroKey<GenericRecord>, NullWritable> {
-
- private static final String COMPACTION_JOB_PREFIX = "compaction.job.";
-
- /**
- * Properties related to the input format of the compaction job of a dataset.
- */
- private static final String COMPACTION_JOB_MAPRED_MAX_SPLIT_SIZE =
COMPACTION_JOB_PREFIX + "mapred.max.split.size";
- private static final long DEFAULT_COMPACTION_JOB_MAPRED_MAX_SPLIT_SIZE =
268435456;
- private static final String COMPACTION_JOB_MAPRED_MIN_SPLIT_SIZE =
COMPACTION_JOB_PREFIX + "mapred.min.split.size";
- private static final long DEFAULT_COMPACTION_JOB_MAPRED_MIN_SPLIT_SIZE =
268435456;
-
- private static final int SPLIT_MAX_NUM_LOCATIONS = 10;
-
- @Override
- public List<InputSplit> getSplits(JobContext cx) throws IOException {
- Job modifiedJob = Job.getInstance(cx.getConfiguration());
- setSplitSize(modifiedJob);
- FileInputFormat.setInputDirRecursive(modifiedJob, true);
- return cleanSplits(super.getSplits(modifiedJob));
- }
-
- private void setSplitSize(JobContext cx) {
-
super.setMaxSplitSize(cx.getConfiguration().getLong(COMPACTION_JOB_MAPRED_MAX_SPLIT_SIZE,
- DEFAULT_COMPACTION_JOB_MAPRED_MAX_SPLIT_SIZE));
-
super.setMinSplitSizeNode(cx.getConfiguration().getLong(COMPACTION_JOB_MAPRED_MIN_SPLIT_SIZE,
- DEFAULT_COMPACTION_JOB_MAPRED_MIN_SPLIT_SIZE));
- }
-
- /**
- * Set the number of locations in the split to SPLIT_MAX_NUM_LOCATIONS if it
is larger than
- * SPLIT_MAX_NUM_LOCATIONS (MAPREDUCE-5186).
- */
- private static List<InputSplit> cleanSplits(List<InputSplit> splits) throws
IOException {
- if (VersionInfo.getVersion().compareTo("2.3.0") >= 0) {
- // This issue was fixed in 2.3.0, if newer version, no need to clean up
splits
- return splits;
- }
-
- List<InputSplit> cleanedSplits = Lists.newArrayList();
-
- for (int i = 0; i < splits.size(); i++) {
- CombineFileSplit oldSplit = (CombineFileSplit) splits.get(i);
- String[] locations = oldSplit.getLocations();
-
- Preconditions.checkNotNull(locations, "CombineFileSplit.getLocations()
returned null");
-
- if (locations.length > SPLIT_MAX_NUM_LOCATIONS) {
- locations = Arrays.copyOf(locations, SPLIT_MAX_NUM_LOCATIONS);
- }
-
- cleanedSplits.add(new CombineFileSplit(oldSplit.getPaths(),
oldSplit.getStartOffsets(), oldSplit.getLengths(),
- locations));
- }
- return cleanedSplits;
- }
+ extends CompactionCombineFileInputFormat<AvroKey<GenericRecord>,
NullWritable> {
@Override
public RecordReader<AvroKey<GenericRecord>, NullWritable>
createRecordReader(InputSplit split, TaskAttemptContext cx)
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyCompactorOutputFormat.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyCompactorOutputFormat.java
new file mode 100644
index 0000000..9c94224
--- /dev/null
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyCompactorOutputFormat.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.orc;
+
+import java.io.IOException;
+import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
+import org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Writer;
+import org.apache.orc.mapreduce.OrcMapreduceRecordWriter;
+import org.apache.orc.mapreduce.OrcOutputFormat;
+
+import static
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*;
+
+
+/**
+ * Extension of {@link OrcOutputFormat} for customized {@link
CompactorOutputCommitter}
+ */
+public class OrcKeyCompactorOutputFormat extends OrcOutputFormat {
+
+ private FileOutputCommitter committer = null;
+
+ @Override
+ public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext
context) throws IOException {
+ if (this.committer == null) {
+ this.committer = new
CompactorOutputCommitter(FileOutputFormat.getOutputPath(context), context);
+ }
+ return this.committer;
+ }
+
+ /**
+ * Required for extension since super method hard-coded file extension as
".orc". To keep flexibility
+ * of extension name, we made it configuration driven.
+ * @param taskAttemptContext The source of configuration that determines the
file extension
+ * @return The {@link RecordWriter} that write out Orc object.
+ * @throws IOException
+ */
+ @Override
+ public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext)
throws IOException {
+ Configuration conf = taskAttemptContext.getConfiguration();
+ String extension = "." + conf.get(COMPACTION_OUTPUT_EXTENSION, "orc" );
+
+ Path filename = getDefaultWorkFile(taskAttemptContext, extension);
+ Writer writer = OrcFile.createWriter(filename,
+ org.apache.orc.mapred.OrcOutputFormat.buildOptions(conf));
+ return new OrcMapreduceRecordWriter(writer);
+ }
+}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyComparator.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyComparator.java
new file mode 100644
index 0000000..efcd1f5
--- /dev/null
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyComparator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.orc;
+
+import java.io.DataInput;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.orc.OrcConf;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcStruct;
+
+
+/**
+ * Compare {@link OrcKey} in shuffle of MapReduce.
+ * Delegate byte decoding to underlying {@link
OrcStruct#readFields(DataInput)} method to simplify comparison.
+ */
+public class OrcKeyComparator extends Configured implements
RawComparator<OrcKey> {
+ private TypeDescription schema;
+ private OrcKey key1;
+ private OrcKey key2;
+ private DataInputBuffer buffer;
+
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ if (null != conf) {
+ // The MapReduce framework will be using this comparator to sort OrcKey
objects
+ // output from the map phase, so use the schema defined for the map
output key
+ // and the data model non-raw compare() implementation.
+ schema =
TypeDescription.fromString(conf.get(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute()));
+ OrcStruct orcRecordModel = (OrcStruct) OrcStruct.createValue(schema);
+
+ if (key1 == null) {
+ key1 = new OrcKey();
+ }
+ if (key2 == null) {
+ key2 = new OrcKey();
+ }
+ if (buffer == null) {
+ buffer = new DataInputBuffer();
+ }
+
+ key1.key = orcRecordModel;
+ key2.key = orcRecordModel;
+ }
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ try {
+ buffer.reset(b1, s1, l1); // parse key1
+ key1.readFields(buffer);
+
+ buffer.reset(b2, s2, l2); // parse key2
+ key2.readFields(buffer);
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return compare(key1, key2); // compare them
+ }
+
+ @Override
+ public int compare(OrcKey o1, OrcKey o2) {
+ if (!(o1.key instanceof OrcStruct) || !(o2.key instanceof OrcStruct)) {
+ throw new IllegalStateException("OrcKey should have its key value be
instance of OrcStruct");
+ }
+ return ((OrcStruct) o1.key).compareTo((OrcStruct) o2.key);
+ }
+}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
new file mode 100644
index 0000000..ad5c4dd
--- /dev/null
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.orc;
+
+import com.google.common.base.Optional;
+import org.apache.gobblin.compaction.mapreduce.RecordKeyDedupReducerBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcValue;
+
+
+public class OrcKeyDedupReducer extends RecordKeyDedupReducerBase<OrcKey,
OrcValue, NullWritable, OrcValue> {
+ @Override
+ protected void setOutValue(OrcValue valueToRetain) {
+ // Better to copy instead reassigning reference.
+ outValue.value = valueToRetain.value;
+ }
+
+ @Override
+ protected void setOutKey(OrcValue valueToRetain) {
+ // do nothing since initReusableObject has assigned value for outKey.
+ }
+
+ @Override
+ protected void initDeltaComparator(Configuration conf) {
+ deltaComparatorOptional = Optional.absent();
+ }
+
+ @Override
+ protected void initReusableObject() {
+ outKey = NullWritable.get();
+ outValue = new OrcValue();
+ }
+}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
new file mode 100644
index 0000000..7f82690
--- /dev/null
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.orc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import
org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
+import org.apache.gobblin.util.FileListUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+
+
+public class OrcUtils {
+ // For Util class to prevent initialization
+ private OrcUtils() {
+
+ }
+
+ public static TypeDescription getTypeDescriptionFromFile(Configuration conf,
Path orcFilePath) throws IOException {
+ return getRecordReaderFromFile(conf, orcFilePath).getSchema();
+ }
+
+ public static Reader getRecordReaderFromFile(Configuration conf, Path
orcFilePath) throws IOException {
+ return OrcFile.createReader(orcFilePath, new OrcFile.ReaderOptions(conf));
+ }
+
+ public static TypeDescription getNewestSchemaFromSource(Job job, FileSystem
fs) throws IOException {
+ Path[] sourceDirs = FileInputFormat.getInputPaths(job);
+
+ List<FileStatus> files = new ArrayList<FileStatus>();
+
+ for (Path sourceDir : sourceDirs) {
+ files.addAll(FileListUtils.listFilesRecursively(fs, sourceDir));
+ }
+ Collections.sort(files, new
MRCompactorAvroKeyDedupJobRunner.LastModifiedDescComparator());
+
+ TypeDescription resultSchema;
+ for (FileStatus status : files) {
+ resultSchema = getTypeDescriptionFromFile(job.getConfiguration(),
status.getPath());
+ if (resultSchema != null) {
+ return resultSchema;
+ }
+ }
+
+ throw new IllegalStateException(
+ String.format("There's no file carrying orc file schema in %s list",
sourceDirs));
+ }
+}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueCombineFileInputFormat.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueCombineFileInputFormat.java
new file mode 100644
index 0000000..0a55567
--- /dev/null
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueCombineFileInputFormat.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.orc;
+
+import java.io.IOException;
+import
org.apache.gobblin.compaction.mapreduce.CompactionCombineFileInputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.orc.mapred.OrcValue;
+
+public class OrcValueCombineFileInputFormat extends
CompactionCombineFileInputFormat<NullWritable, OrcValue> {
+
+ @Override
+ public RecordReader<NullWritable, OrcValue> createRecordReader(InputSplit
split, TaskAttemptContext context)
+ throws IOException {
+ return new CombineFileRecordReader((CombineFileSplit) split, context,
OrcValueCombineFileRecordReader.class);
+ }
+}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueCombineFileRecordReader.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueCombineFileRecordReader.java
new file mode 100644
index 0000000..3976e53
--- /dev/null
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueCombineFileRecordReader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.orc;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapreduce.OrcMapreduceRecordReader;
+
+
+public class OrcValueCombineFileRecordReader extends OrcMapreduceRecordReader {
+ private final CombineFileSplit split;
+ private final Integer splitIdx;
+
+ public OrcValueCombineFileRecordReader(CombineFileSplit split,
TaskAttemptContext context, Integer idx)
+ throws IOException {
+ this(getRecordReaderFromFile(split, context, idx), getSchema(split,
context, idx), split, idx);
+ }
+
+ public OrcValueCombineFileRecordReader(RecordReader reader, TypeDescription
schema, CombineFileSplit split,
+ Integer splitIdx) throws IOException {
+ super(reader, schema);
+ this.split = split;
+ this.splitIdx = splitIdx;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext
taskAttemptContext) {
+ super.initialize(new FileSplit(this.split.getPath(this.splitIdx),
this.split.getOffset(this.splitIdx),
+ this.split.getLength(this.splitIdx), null), taskAttemptContext);
+ }
+
+ private static TypeDescription getSchema(CombineFileSplit split,
TaskAttemptContext context, Integer idx)
+ throws IOException {
+ Path path = split.getPath(idx);
+ return OrcUtils.getTypeDescriptionFromFile(context.getConfiguration(),
path);
+ }
+
+ private static RecordReader getRecordReaderFromFile(CombineFileSplit split,
TaskAttemptContext context, Integer idx)
+ throws IOException {
+ Path path = split.getPath(idx);
+ return OrcUtils.getRecordReaderFromFile(context.getConfiguration(),
path).rows();
+ }
+}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
new file mode 100644
index 0000000..0dd3b5c
--- /dev/null
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.orc;
+
+import java.io.IOException;
+import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.apache.orc.mapreduce.OrcMapreduceRecordReader;
+
+
+/**
+ * To keep consistent with {@link OrcMapreduceRecordReader}'s decision on
implementing
+ * {@link RecordReader} with {@link NullWritable} as the key and generic type
of value, the ORC Mapper will
+ * read in the record as the input value.
+ */
+public class OrcValueMapper extends RecordKeyMapperBase<NullWritable,
OrcStruct, OrcKey, Object> {
+
+ private OrcKey outKey;
+ private OrcValue outValue;
+
+ @Override
+ protected void setup(Context context) throws IOException,
InterruptedException {
+ super.setup(context);
+ this.outKey = new OrcKey();
+ this.outValue = new OrcValue();
+ }
+
+ @Override
+ protected void map(NullWritable key, OrcStruct orcStruct, Context context)
throws IOException, InterruptedException {
+ if (context.getNumReduceTasks() == 0) {
+ this.outKey.key = orcStruct;
+ context.write(this.outKey, NullWritable.get());
+ } else {
+ this.outValue.value = orcStruct;
+ context.write(getDedupKey(orcStruct), this.outValue);
+ }
+
+ context.getCounter(EVENT_COUNTER.RECORD_COUNT).increment(1);
+ }
+
+ /**
+ * By default, dedup key contains the whole ORC record, except MAP since
{@link org.apache.orc.mapred.OrcMap} is
+ * an implementation of {@link java.util.TreeMap} which doesn't accept
difference of records within the map in comparison.
+ */
+ protected OrcKey getDedupKey(OrcStruct originalRecord) {
+ return convertOrcStructToOrcKey(originalRecord);
+ }
+
+ /**
+ * The output key of mapper needs to be comparable. In the scenarios that we
need the orc record itself
+ * to be the output key, this conversion will be necessary.
+ */
+ protected OrcKey convertOrcStructToOrcKey(OrcStruct struct) {
+ OrcKey orcKey = new OrcKey();
+ orcKey.key = struct;
+ return orcKey;
+ }
+}
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
similarity index 99%
rename from
gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java
rename to
gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
index 51fe866..909d97c 100644
---
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
@@ -52,7 +52,7 @@ import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
@Slf4j
-public class MRCompactionTaskTest {
+public class AvroCompactionTaskTest {
protected FileSystem getFileSystem()
throws IOException {
@@ -163,7 +163,7 @@ public class MRCompactionTaskTest {
writer.close();
}
- private EmbeddedGobblin createEmbeddedGobblin (String name, String basePath)
{
+ static EmbeddedGobblin createEmbeddedGobblin (String name, String basePath) {
String pattern = new Path(basePath, "*/*/minutely/*/*/*/*").toString();
return new EmbeddedGobblin(name)
@@ -201,7 +201,7 @@ public class MRCompactionTaskTest {
.setConfiguration(SimpleDatasetHierarchicalPrioritizer.TIER_KEY +
".2", "BizProfile");
}
- @Test
+ @Test
public void testWorkUnitStream () throws Exception {
File basePath = Files.createTempDir();
basePath.deleteOnExit();
@@ -221,7 +221,7 @@ public class MRCompactionTaskTest {
Assert.assertTrue(result.isSuccessful());
}
- @Test
+ @Test
public void testWorkUnitStreamForAllFailures () throws Exception {
File basePath = Files.createTempDir();
basePath.deleteOnExit();
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyDedupReducerTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/KeyDedupReducerTest.java
similarity index 84%
rename from
gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyDedupReducerTest.java
rename to
gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/KeyDedupReducerTest.java
index 2de03d1..6183f35 100644
---
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyDedupReducerTest.java
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/KeyDedupReducerTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.gobblin.compaction.mapreduce.avro;
+package org.apache.gobblin.compaction.mapreduce;
import java.io.IOException;
import java.util.List;
@@ -25,6 +25,8 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
+import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyDedupReducer;
+import
org.apache.gobblin.compaction.mapreduce.avro.FieldAttributeBasedDeltaFieldsProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Counter;
@@ -42,17 +44,18 @@ import static org.mockito.Mockito.when;
/**
- * Test class for {@link AvroKeyDedupReducer}.
+ * Test class for {@link
org.apache.gobblin.compaction.mapreduce.RecordKeyDedupReducerBase}.
+ * Will have test separately in both avro and orc.
*/
-public class AvroKeyDedupReducerTest {
- private static final String KEY_SCHEMA =
+public class KeyDedupReducerTest {
+ private static final String AVRO_KEY_SCHEMA =
"{ \"type\" : \"record\", \"name\" : \"etl\",\"namespace\" :
\"reducerTest\", \"fields\" : [ { \"name\" : "
+ "\"key\", \"type\" : {\"type\" : \"record\", \"name\" :
\"key_name\", \"namespace\" : \"key_namespace\", "
+ "\"fields\" : [ {\"name\" : \"partitionKey\", \"type\" : \"long\",
\"doc\" : \"\"}, { \"name\" : \"environment"
+ "\", \"type\" : \"string\",\"doc\" : \"\"}, {\"name\" :
\"subKey\",\"type\" : \"string\", \"doc\" : \"\"} ]}, "
+ "\"doc\" : \"\", \"attributes_json\" :
\"{\\\"delta\\\":false,\\\"pk\\\":true}\" }]}";
- private static final String FULL_SCHEMA =
+ private static final String AVRO_FULL_SCHEMA =
"{ \"type\" : \"record\", \"name\" : \"etl\",\"namespace\" :
\"reducerTest\", \"fields\" : [ { \"name\" : "
+ "\"key\", \"type\" : {\"type\" : \"record\", \"name\" :
\"key_name\", \"namespace\" : \"key_namespace\", "
+ "\"fields\" : [ {\"name\" : \"partitionKey\", \"type\" : \"long\",
\"doc\" : \"\"}, { \"name\" : \"environment"
@@ -63,7 +66,7 @@ public class AvroKeyDedupReducerTest {
+ " , {\"name\" : \"scn\", \"type\": \"long\", \"doc\" : \"\",
\"attributes_json\" : \"{\\\"nullable\\\":false,\\\"delta"
+ "\\\":true,\\\"pk\\\":false,\\\"type\\\":\\\"NUMBER\\\"}\"}]}";
- private static final String FULL_SCHEMA_WITH_TWO_DELTA_FIELDS =
+ private static final String AVRO_FULL_SCHEMA_WITH_TWO_DELTA_FIELDS =
"{ \"type\" : \"record\", \"name\" : \"etl\",\"namespace\" :
\"reducerTest\", \"fields\" : [ { \"name\" : "
+ "\"key\", \"type\" : {\"type\" : \"record\", \"name\" :
\"key_name\", \"namespace\" : \"key_namespace\", "
+ "\"fields\" : [ {\"name\" : \"partitionKey\", \"type\" : \"long\",
\"doc\" : \"\"}, { \"name\" : \"environment"
@@ -75,9 +78,9 @@ public class AvroKeyDedupReducerTest {
+ "\\\":true,\\\"pk\\\":false,\\\"type\\\":\\\"NUMBER\\\"}\"}]}";
@Test
- public void testReduce()
+ public void testAvroReduce()
throws IOException, InterruptedException {
- Schema keySchema = new Schema.Parser().parse(KEY_SCHEMA);
+ Schema keySchema = new Schema.Parser().parse(AVRO_KEY_SCHEMA);
GenericRecordBuilder keyRecordBuilder = new
GenericRecordBuilder(keySchema.getField("key").schema());
keyRecordBuilder.set("partitionKey", 1);
keyRecordBuilder.set("environment", "test");
@@ -88,7 +91,7 @@ public class AvroKeyDedupReducerTest {
GenericRecord keyRecord = keyRecordBuilder.build();
// Test reducer with delta field "scn"
- Schema fullSchema = new Schema.Parser().parse(FULL_SCHEMA);
+ Schema fullSchema = new Schema.Parser().parse(AVRO_FULL_SCHEMA);
AvroValue<GenericRecord> fullRecord1 = new AvroValue<>();
AvroValue<GenericRecord> fullRecord2 = new AvroValue<>();
AvroValue<GenericRecord> fullRecord3 = new AvroValue<>();
@@ -117,18 +120,19 @@ public class AvroKeyDedupReducerTest {
when(conf.get(FieldAttributeBasedDeltaFieldsProvider.DELTA_PROP_NAME,
FieldAttributeBasedDeltaFieldsProvider.DEFAULT_DELTA_PROP_NAME))
.thenReturn(FieldAttributeBasedDeltaFieldsProvider.DEFAULT_DELTA_PROP_NAME);
- AvroKeyDedupReducer reducer = new AvroKeyDedupReducer();
+ RecordKeyDedupReducerBase<AvroKey<GenericRecord>, AvroValue<GenericRecord>,
+ AvroKey<GenericRecord>, NullWritable> reducer = new
AvroKeyDedupReducer();
WrappedReducer.Context reducerContext = mock(WrappedReducer.Context.class);
when(reducerContext.getConfiguration()).thenReturn(conf);
Counter moreThan1Counter = new GenericCounter();
-
when(reducerContext.getCounter(AvroKeyDedupReducer.EVENT_COUNTER.MORE_THAN_1)).thenReturn(moreThan1Counter);
+
when(reducerContext.getCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.MORE_THAN_1)).thenReturn(moreThan1Counter);
Counter dedupedCounter = new GenericCounter();
-
when(reducerContext.getCounter(AvroKeyDedupReducer.EVENT_COUNTER.DEDUPED)).thenReturn(dedupedCounter);
+
when(reducerContext.getCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.DEDUPED)).thenReturn(dedupedCounter);
Counter recordCounter = new GenericCounter();
-
when(reducerContext.getCounter(AvroKeyDedupReducer.EVENT_COUNTER.RECORD_COUNT)).thenReturn(recordCounter);
+
when(reducerContext.getCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.RECORD_COUNT)).thenReturn(recordCounter);
reducer.setup(reducerContext);
doNothing().when(reducerContext).write(any(AvroKey.class),
any(NullWritable.class));
@@ -144,13 +148,14 @@ public class AvroKeyDedupReducerTest {
Configuration conf2 = mock(Configuration.class);
when(conf2.get(AvroKeyDedupReducer.DELTA_SCHEMA_PROVIDER)).thenReturn(null);
when(reducerContext.getConfiguration()).thenReturn(conf2);
- AvroKeyDedupReducer reducer2 = new AvroKeyDedupReducer();
+ RecordKeyDedupReducerBase<AvroKey<GenericRecord>, AvroValue<GenericRecord>,
+ AvroKey<GenericRecord>, NullWritable> reducer2 = new
AvroKeyDedupReducer();
reducer2.setup(reducerContext);
reducer2.reduce(key, valueIterable, reducerContext);
Assert.assertEquals(reducer2.getOutKey().datum(), fullRecord1.datum());
// Test reducer with compound delta key.
- Schema fullSchema2 = new
Schema.Parser().parse(FULL_SCHEMA_WITH_TWO_DELTA_FIELDS);
+ Schema fullSchema2 = new
Schema.Parser().parse(AVRO_FULL_SCHEMA_WITH_TWO_DELTA_FIELDS);
GenericRecordBuilder fullRecordBuilder2 = new
GenericRecordBuilder(fullSchema2);
fullRecordBuilder2.set("key", record);
fullRecordBuilder2.set("scn", 123);
@@ -170,7 +175,5 @@ public class AvroKeyDedupReducerTest {
reducer.reduce(key, valueIterable2, reducerContext);
Assert.assertEquals(reducer.getOutKey().datum(), fullRecord3.datum());
-
-
}
}
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
new file mode 100644
index 0000000..5e86dd3
--- /dev/null
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.runtime.api.JobExecutionResult;
+import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.impl.ReaderImpl;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapreduce.OrcMapreduceRecordReader;
+import org.apache.orc.mapreduce.OrcMapreduceRecordWriter;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.apache.gobblin.compaction.mapreduce.AvroCompactionTaskTest.*;
+import static
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*;
+
+
+public class OrcCompactionTaskTest {
+
+ @Test
+ public void basicTest() throws Exception {
+ File basePath = Files.createTempDir();
+ basePath.deleteOnExit();
+
+ String minutelyPath =
"Identity/MemberAccount/minutely/2017/04/03/10/20_30/run_2017-04-03-10-20";
+ String hourlyPath = "Identity/MemberAccount/hourly/2017/04/03/10/";
+ File jobDir = new File(basePath, minutelyPath);
+ Assert.assertTrue(jobDir.mkdirs());
+
+ // Write some ORC file for compaction here.
+ TypeDescription schema = TypeDescription.fromString("struct<i:int,j:int>");
+ OrcStruct orcStruct_0 = (OrcStruct) OrcStruct.createValue(schema);
+ orcStruct_0.setFieldValue("i", new IntWritable(1));
+ orcStruct_0.setFieldValue("j", new IntWritable(2));
+
+ OrcStruct orcStruct_1 = (OrcStruct) OrcStruct.createValue(schema);
+ orcStruct_1.setFieldValue("i", new IntWritable(1));
+ orcStruct_1.setFieldValue("j", new IntWritable(2));
+
+ File file_0 = new File(jobDir, "file_0");
+ File file_1 = new File(jobDir, "file_1");
+ writeOrcRecordsInFile(new Path(file_0.getAbsolutePath()), schema,
ImmutableList.of(orcStruct_0));
+ writeOrcRecordsInFile(new Path(file_1.getAbsolutePath()), schema,
ImmutableList.of(orcStruct_1));
+
+ // Verify execution
+
+ // Overwrite the job configurator factory key.
+ String extensionFileName = "orcavro";
+ EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin("basic",
basePath.getAbsolutePath().toString())
+
.setConfiguration(CompactionJobConfigurator.COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
+ TestCompactionOrcJobConfigurator.Factory.class.getName())
+ .setConfiguration(COMPACTION_OUTPUT_EXTENSION, extensionFileName);
+ JobExecutionResult execution = embeddedGobblin.run();
+ Assert.assertTrue(execution.isSuccessful());
+
+ // Result verification
+ File outputDir = new File(basePath, hourlyPath);
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ List<FileStatus> statuses = new ArrayList<>();
+ for (FileStatus status : fs.listStatus(new
Path(outputDir.getAbsolutePath()), new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return FilenameUtils.isExtension(path.getName(), extensionFileName);
+ }
+ })) {
+ statuses.add(status);
+ }
+
+ Assert.assertTrue(statuses.size() == 1);
+ List<OrcStruct> result = readOrcFile(statuses.get(0).getPath());
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.get(0).getFieldValue("i"), new IntWritable(1));
+ Assert.assertEquals(result.get(0).getFieldValue("j"), new IntWritable(2));
+ }
+
+ /**
+ * Read a output ORC compacted file into memory.
+ */
+ public List<OrcStruct> readOrcFile(Path orcFilePath)
+ throws IOException, InterruptedException {
+ ReaderImpl orcReader = new ReaderImpl(orcFilePath, new
OrcFile.ReaderOptions(new Configuration()));
+
+ Reader.Options options = new
Reader.Options().schema(orcReader.getSchema());
+ OrcMapreduceRecordReader recordReader = new
OrcMapreduceRecordReader(orcReader, options);
+ List<OrcStruct> result = new ArrayList<>();
+
+ while (recordReader.nextKeyValue()) {
+ result.add((OrcStruct) recordReader.getCurrentValue());
+ }
+
+ return result;
+ }
+
+ public void writeOrcRecordsInFile(Path path, TypeDescription schema,
List<OrcStruct> orcStructs) throws Exception {
+ Configuration configuration = new Configuration();
+ OrcFile.WriterOptions options =
OrcFile.writerOptions(configuration).setSchema(schema);
+
+ Writer writer = OrcFile.createWriter(path, options);
+ OrcMapreduceRecordWriter recordWriter = new
OrcMapreduceRecordWriter(writer);
+ for (OrcStruct orcRecord : orcStructs) {
+ recordWriter.write(NullWritable.get(), orcRecord);
+ }
+ recordWriter.close(new TaskAttemptContextImpl(configuration, new
TaskAttemptID()));
+ }
+
+ private static class TestCompactionOrcJobConfigurator extends
CompactionOrcJobConfigurator {
+ public static class Factory implements
CompactionJobConfigurator.ConfiguratorFactory {
+ @Override
+ public TestCompactionOrcJobConfigurator createConfigurator(State state)
throws IOException {
+ return new TestCompactionOrcJobConfigurator(state);
+ }
+ }
+
+ @Override
+ protected void setNumberOfReducers(Job job) throws IOException {
+ job.setNumReduceTasks(1);
+ }
+
+ public TestCompactionOrcJobConfigurator(State state) throws IOException {
+ super(state);
+ }
+ }
+}
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyComparatorTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyComparatorTest.java
new file mode 100644
index 0000000..d26dc6f
--- /dev/null
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyComparatorTest.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.orc;
+
+import java.util.TreeMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcList;
+import org.apache.orc.mapred.OrcMap;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcUnion;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Mainly to test {@link OrcKeyComparator} is behaving as expected when it is
comparing two {@link OrcStruct}.
+ * It covers basic(primitive) type of {@link OrcStruct} and those contain
complex type (MAP, LIST, UNION, Struct)
+ *
+ * Reference: https://orc.apache.org/docs/types.html
+ */
+public class OrcKeyComparatorTest {
+ @Test
+ public void testSimpleComparator() throws Exception {
+ OrcKeyComparator comparator = new OrcKeyComparator();
+ Configuration conf = new Configuration();
+ String orcSchema = "struct<i:int,j:int>";
+ TypeDescription schema = TypeDescription.fromString(orcSchema);
+ conf.set(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute(), orcSchema);
+
Assert.assertEquals(conf.get(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute()),
orcSchema);
+ comparator.setConf(conf);
+
+ OrcStruct record0 = createSimpleOrcStruct(schema, 1, 2);
+ OrcStruct record1 = createSimpleOrcStruct(schema, 3, 4);
+ OrcStruct record2 = createSimpleOrcStruct(schema, 3, 4);
+
+ OrcKey orcKey0 = new OrcKey();
+ orcKey0.key = record0;
+ OrcKey orcKey1 = new OrcKey();
+ orcKey1.key = record1;
+ OrcKey orcKey2 = new OrcKey();
+ orcKey2.key = record2;
+
+ Assert.assertTrue(comparator.compare(orcKey0, orcKey1) < 0);
+ Assert.assertTrue(comparator.compare(orcKey1, orcKey2) == 0);
+ Assert.assertTrue(comparator.compare(orcKey1, orcKey0) > 0);
+ }
+
+ @Test
+ public void testComplexRecordArray() throws Exception {
+ OrcKeyComparator comparator = new OrcKeyComparator();
+ Configuration conf = new Configuration();
+
+ TypeDescription listSchema =
TypeDescription.createList(TypeDescription.createString());
+ TypeDescription schema =
+ TypeDescription.createStruct().addField("a",
TypeDescription.createInt()).addField("b", listSchema);
+
+ conf.set(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute(),
schema.toString());
+
Assert.assertEquals(conf.get(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute()),
schema.toString());
+ comparator.setConf(conf);
+
+ // base record
+ OrcStruct record0 = (OrcStruct) OrcStruct.createValue(schema);
+ record0.setFieldValue("a", new IntWritable(1));
+ OrcList orcList0 = createOrcList(3, listSchema, 3);
+ record0.setFieldValue("b", orcList0);
+
+ // the same as base but different object, expecting equal to each other.
+ OrcStruct record1 = (OrcStruct) OrcStruct.createValue(schema);
+ record1.setFieldValue("a", new IntWritable(1));
+ OrcList orcList1 = createOrcList(3, listSchema, 3);
+ record1.setFieldValue("b", orcList1);
+
+ // Diff in int field
+ OrcStruct record2 = (OrcStruct) OrcStruct.createValue(schema);
+ record2.setFieldValue("a", new IntWritable(2));
+ OrcList orcList2 = createOrcList(3, listSchema, 3);
+ record2.setFieldValue("b", orcList2);
+
+ // Diff in array field: 1
+ OrcStruct record3 = (OrcStruct) OrcStruct.createValue(schema);
+ record3.setFieldValue("a", new IntWritable(1));
+ OrcList orcList3 = createOrcList(3, listSchema, 5);
+ record3.setFieldValue("b", orcList3);
+
+ // Diff in array field: 2
+ OrcStruct record4 = (OrcStruct) OrcStruct.createValue(schema);
+ record4.setFieldValue("a", new IntWritable(1));
+ OrcList orcList4 = createOrcList(4, listSchema, 3);
+ record4.setFieldValue("b", orcList4);
+
+ OrcKey orcKey0 = new OrcKey();
+ orcKey0.key = record0;
+ OrcKey orcKey1 = new OrcKey();
+ orcKey1.key = record1;
+ OrcKey orcKey2 = new OrcKey();
+ orcKey2.key = record2;
+ OrcKey orcKey3 = new OrcKey();
+ orcKey3.key = record3;
+ OrcKey orcKey4 = new OrcKey();
+ orcKey4.key = record4;
+
+ Assert.assertTrue(comparator.compare(orcKey0, orcKey1) == 0);
+ Assert.assertTrue(comparator.compare(orcKey1, orcKey2) < 0);
+ Assert.assertTrue(comparator.compare(orcKey1, orcKey3) < 0);
+ Assert.assertTrue(comparator.compare(orcKey1, orcKey4) < 0);
+ }
+
+ @Test
+ public void testComplexRecordMap() throws Exception {
+ OrcKeyComparator comparator = new OrcKeyComparator();
+ Configuration conf = new Configuration();
+ TypeDescription mapFieldSchema =
+ TypeDescription.createMap(TypeDescription.createString(),
TypeDescription.createString());
+ TypeDescription schema =
+ TypeDescription.createStruct().addField("a",
TypeDescription.createInt()).addField("b", mapFieldSchema);
+
+ conf.set(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute(),
schema.toString());
+
Assert.assertEquals(conf.get(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute()),
schema.toString());
+ comparator.setConf(conf);
+
+ // base record
+ OrcStruct record0 = (OrcStruct) OrcStruct.createValue(schema);
+ record0.setFieldValue("a", new IntWritable(1));
+ OrcMap orcMap = createSimpleOrcMap(new Text("key"), new Text("value"),
mapFieldSchema);
+ record0.setFieldValue("b", orcMap);
+
+ // key value both differ
+ OrcStruct record1 = (OrcStruct) OrcStruct.createValue(schema);
+ record1.setFieldValue("a", new IntWritable(1));
+ OrcMap orcMap1 = createSimpleOrcMap(new Text("key_key"), new
Text("value_value"), mapFieldSchema);
+ record1.setFieldValue("b", orcMap1);
+
+ // Key same, value differ
+ OrcStruct record2 = (OrcStruct) OrcStruct.createValue(schema);
+ record2.setFieldValue("a", new IntWritable(1));
+ OrcMap orcMap2 = createSimpleOrcMap(new Text("key"), new
Text("value_value"), mapFieldSchema);
+ record2.setFieldValue("b", orcMap2);
+
+ // Same as base
+ OrcStruct record3 = (OrcStruct) OrcStruct.createValue(schema);
+ record3.setFieldValue("a", new IntWritable(1));
+ OrcMap orcMap3 = createSimpleOrcMap(new Text("key"), new Text("value"),
mapFieldSchema);
+ record3.setFieldValue("b", orcMap3);
+
+ // Differ in other field.
+ OrcStruct record4 = (OrcStruct) OrcStruct.createValue(schema);
+ record4.setFieldValue("a", new IntWritable(2));
+ record4.setFieldValue("b", orcMap);
+
+ // Record with map containing multiple entries but inserted in different
order.
+ OrcStruct record6 = (OrcStruct) OrcStruct.createValue(schema);
+ record6.setFieldValue("a", new IntWritable(1));
+ OrcMap orcMap6 = createSimpleOrcMap(new Text("key"), new Text("value"),
mapFieldSchema);
+ orcMap6.put(new Text("keyLater"), new Text("valueLater"));
+ record6.setFieldValue("b", orcMap6);
+
+ OrcStruct record7 = (OrcStruct) OrcStruct.createValue(schema);
+ record7.setFieldValue("a", new IntWritable(1));
+ OrcMap orcMap7 = createSimpleOrcMap(new Text("keyLater"), new
Text("valueLater"), mapFieldSchema);
+ orcMap7.put(new Text("key"), new Text("value"));
+ record7.setFieldValue("b", orcMap7);
+
+ OrcKey orcKey0 = new OrcKey();
+ orcKey0.key = record0;
+ OrcKey orcKey1 = new OrcKey();
+ orcKey1.key = record1;
+ OrcKey orcKey2 = new OrcKey();
+ orcKey2.key = record2;
+ OrcKey orcKey3 = new OrcKey();
+ orcKey3.key = record3;
+ OrcKey orcKey4 = new OrcKey();
+ orcKey4.key = record4;
+
+ OrcKey orcKey6 = new OrcKey();
+ orcKey6.key = record6;
+ OrcKey orcKey7 = new OrcKey();
+ orcKey7.key = record7;
+
+ Assert.assertTrue(comparator.compare(orcKey0, orcKey1) < 0);
+ Assert.assertTrue(comparator.compare(orcKey1, orcKey2) > 0);
+ Assert.assertTrue(comparator.compare(orcKey2, orcKey3) > 0);
+ Assert.assertTrue(comparator.compare(orcKey0, orcKey3) == 0);
+ Assert.assertTrue(comparator.compare(orcKey0, orcKey4) < 0);
+ Assert.assertTrue(comparator.compare(orcKey6, orcKey7) == 0);
+ }
+
+ // Test comparison for union containing complex types and nested record
inside.
+ // Schema: struct<a:int,
+ // b:uniontype<int,
+ // array<string>,
+ // struct<x:int,y:int>
+ // >
+ // >
+ @Test
+ public void testComplexRecordUnion() throws Exception {
+ OrcKeyComparator comparator = new OrcKeyComparator();
+ Configuration conf = new Configuration();
+
+ TypeDescription listSchema =
TypeDescription.createList(TypeDescription.createString());
+
+ TypeDescription nestedRecordSchema = TypeDescription.createStruct()
+ .addField("x", TypeDescription.createInt())
+ .addField("y", TypeDescription.createInt());
+
+ TypeDescription unionSchema = TypeDescription.createUnion()
+ .addUnionChild(TypeDescription.createInt())
+ .addUnionChild(listSchema)
+ .addUnionChild(nestedRecordSchema);
+
+ TypeDescription schema =
+ TypeDescription.createStruct()
+ .addField("a", TypeDescription.createInt())
+ .addField("b", unionSchema);
+
+ conf.set(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute(),
schema.toString());
+
Assert.assertEquals(conf.get(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute()),
schema.toString());
+ comparator.setConf(conf);
+
+ // base record
+ OrcStruct record0 = (OrcStruct) OrcStruct.createValue(schema);
+ record0.setFieldValue("a", new IntWritable(1));
+ OrcStruct nestedRecord0 = createSimpleOrcStruct(nestedRecordSchema, 1, 2);
+ OrcUnion orcUnion0 = createOrcUnion(unionSchema, nestedRecord0);
+ record0.setFieldValue("b", orcUnion0);
+
+ // same content as base record in diff objects.
+ OrcStruct record1 = (OrcStruct) OrcStruct.createValue(schema);
+ record1.setFieldValue("a", new IntWritable(1));
+ OrcStruct nestedRecord1 = createSimpleOrcStruct(nestedRecordSchema, 1, 2);
+ OrcUnion orcUnion1 = createOrcUnion(unionSchema, nestedRecord1);
+ record1.setFieldValue("b", orcUnion1);
+
+ // diff records inside union, record0 == record1 < 2
+ OrcStruct record2 = (OrcStruct) OrcStruct.createValue(schema);
+ record2.setFieldValue("a", new IntWritable(1));
+ OrcStruct nestedRecord2 = createSimpleOrcStruct(nestedRecordSchema, 2, 2);
+ OrcUnion orcUnion2 = createOrcUnion(unionSchema, nestedRecord2);
+ record2.setFieldValue("b", orcUnion2);
+
+
+ // differ in list inside union, record3 < record4 == record5
+ OrcStruct record3 = (OrcStruct) OrcStruct.createValue(schema);
+ record3.setFieldValue("a", new IntWritable(1));
+ OrcList orcList3 = createOrcList(5, listSchema, 2);
+ OrcUnion orcUnion3 = createOrcUnion(unionSchema, orcList3);
+ record3.setFieldValue("b", orcUnion3);
+
+ OrcStruct record4 = (OrcStruct) OrcStruct.createValue(schema);
+ record4.setFieldValue("a", new IntWritable(1));
+ OrcList orcList4 = createOrcList(6, listSchema, 2);
+ OrcUnion orcUnion4 = createOrcUnion(unionSchema, orcList4);
+ record4.setFieldValue("b", orcUnion4);
+
+ OrcStruct record5 = (OrcStruct) OrcStruct.createValue(schema);
+ record5.setFieldValue("a", new IntWritable(1));
+ OrcList orcList5 = createOrcList(6, listSchema, 2);
+ OrcUnion orcUnion5 = createOrcUnion(unionSchema, orcList5);
+ record5.setFieldValue("b", orcUnion5);
+
+
+ OrcKey orcKey0 = new OrcKey();
+ orcKey0.key = record0;
+ OrcKey orcKey1 = new OrcKey();
+ orcKey1.key = record1;
+ OrcKey orcKey2 = new OrcKey();
+ orcKey2.key = record2;
+ OrcKey orcKey3 = new OrcKey();
+ orcKey3.key = record3;
+ OrcKey orcKey4 = new OrcKey();
+ orcKey4.key = record4;
+ OrcKey orcKey5 = new OrcKey();
+ orcKey5.key = record5;
+
+ Assert.assertEquals(orcUnion0, orcUnion1);
+ // Int value in orcKey2 is larger
+ Assert.assertTrue(comparator.compare(orcKey0, orcKey2) < 0);
+ Assert.assertTrue(comparator.compare(orcKey3, orcKey4) < 0 );
+ Assert.assertTrue(comparator.compare(orcKey3, orcKey5) < 0);
+ Assert.assertTrue(comparator.compare(orcKey4, orcKey5) == 0);
+ }
+
+ private OrcMap createSimpleOrcMap(Text key, Text value, TypeDescription
schema) {
+ TreeMap map = new TreeMap<Text, Text>();
+ map.put(key, value);
+ OrcMap result = new OrcMap(schema);
+ result.putAll(map);
+ return result;
+ }
+
+ /**
+ * Create a {@link OrcList} repeating the given parameter inside the list
for multiple times.
+ */
+ private OrcList createOrcList(int element, TypeDescription schema, int num) {
+ OrcList result = new OrcList(schema);
+ for (int i = 0; i < num; i++) {
+ result.add(new IntWritable(element));
+ }
+ return result;
+ }
+
+ private OrcUnion createOrcUnion(TypeDescription schema, WritableComparable
value) {
+ OrcUnion result = new OrcUnion(schema);
+ result.set(0, value);
+ return result;
+ }
+
+ private OrcStruct createSimpleOrcStruct(TypeDescription structSchema, int
value1, int value2) {
+ OrcStruct result = new OrcStruct(structSchema);
+ result.setFieldValue(0, new IntWritable(value1));
+ result.setFieldValue(1, new IntWritable(value2));
+ return result;
+ }
+}
diff --git a/gobblin-modules/gobblin-orc-dep/build.gradle
b/gobblin-modules/gobblin-orc-dep/build.gradle
new file mode 100644
index 0000000..705eaaa
--- /dev/null
+++ b/gobblin-modules/gobblin-orc-dep/build.gradle
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * The motivation of having separated dependency module for orc-mapreduce is
due to
+ * standalone orc-core libaray depends on much higher version of hive which
could conflict
+ * with very lower version of hive-exec-core used by gobblin.
+ */
+
+buildscript {
+ repositories {
+ jcenter()
+ }
+ dependencies {
+ classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
+ }
+}
+
+apply plugin: 'com.github.johnrengelman.shadow'
+apply plugin: 'java'
+tasks.remove(tasks.uploadShadow)
+
+dependencies {
+ compile externalDependency.orcMapreduce
+}
+
+configurations {
+ compile {
+ transitive = true
+ }
+ compile {
+ exclude group: "commons-cli"
+ exclude group: "org.apache.avro"
+ exclude group: "org.apache.httpcomponents"
+ }
+}
+
+shadowJar {
+ zip64 true
+ relocate 'org.apache.hadoop.hive',
'shadow.gobblin.orc.org.apache.hadoop.hive'
+}
+
+ext.classification="library"
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProvider.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProvider.java
index 995a913..375ecc1 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProvider.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProvider.java
@@ -38,14 +38,16 @@ public class CompactionRecordCountProvider extends
RecordCountProvider {
public static final String M_OUTPUT_FILE_PREFIX = "part-m-";
private static final String SEPARATOR = ".";
- private static final String SUFFIX = ".avro";
+ private static final String DEFAULT_SUFFIX = ".avro";
private static final Random RANDOM = new Random();
/**
* Construct the file name as
{filenamePrefix}{recordCount}.{SystemCurrentTimeInMills}.{RandomInteger}{SUFFIX}.
+ * @deprecated discouraged since default behavior is not obvious from API
itself.
*/
+ @Deprecated
public static String constructFileName(String filenamePrefix, long
recordCount) {
- return constructFileName(filenamePrefix, SUFFIX, recordCount);
+ return constructFileName(filenamePrefix, DEFAULT_SUFFIX, recordCount);
}
/**
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProviderTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProviderTest.java
index d78fc79..90c5701 100644
---
a/gobblin-utility/src/test/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProviderTest.java
+++
b/gobblin-utility/src/test/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProviderTest.java
@@ -34,7 +34,7 @@ public class CompactionRecordCountProviderTest {
CompactionRecordCountProvider filenameRecordCountProvider = new
CompactionRecordCountProvider();
Pattern pattern =
Pattern.compile("part\\-r\\-123\\.[\\d]*\\.[\\d]*\\.avro");
-
Assert.assertTrue(pattern.matcher(CompactionRecordCountProvider.constructFileName("part-r-",
123)).matches());
+
Assert.assertTrue(pattern.matcher(CompactionRecordCountProvider.constructFileName("part-r-",
".avro", 123)).matches());
Assert.assertEquals(filenameRecordCountProvider.getRecordCount(new
Path("part-r-123.1.2.avro")), 123);
}
}