This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f589c73  [GOBBLIN-699] Orc compaction impl.
f589c73 is described below

commit f589c732e298735039e4f2be8058bfdbbe04526b
Author: Lei Sun <[email protected]>
AuthorDate: Thu Mar 14 16:07:35 2019 -0700

    [GOBBLIN-699] Orc compaction impl.
    
    Closes #2570 from autumnust/orcCompactionImpl
---
 .../gobblin/binary_creation/AvroTestTools.java     |   1 -
 .../gobblin/binary_creation/OrcTestTools.java      |   2 +-
 gobblin-compaction/build.gradle                    |   8 +-
 .../CompactionCompleteFileOperationAction.java     |   4 +-
 .../compaction/event/CompactionSlaEventHelper.java |  25 +-
 .../HiveMetadataForCompactionExtractor.java        |   3 +-
 .../HiveMetadataForCompactionExtractorFactory.java |   3 +-
 .../mapreduce/CompactionAvroJobConfigurator.java   |   6 +-
 ....java => CompactionCombineFileInputFormat.java} |  31 +-
 .../mapreduce/CompactionJobConfigurator.java       |  15 +-
 .../mapreduce/CompactionOrcJobConfigurator.java    |  80 +++++
 ...ommitter.java => CompactorOutputCommitter.java} |  33 +-
 .../mapreduce/RecordKeyDedupReducerBase.java       | 108 +++++++
 .../compaction/mapreduce/RecordKeyMapperBase.java  |  23 +-
 .../avro/AvroKeyCompactorOutputFormat.java         |   3 +-
 .../mapreduce/avro/AvroKeyDedupReducer.java        |  84 ++----
 .../compaction/mapreduce/avro/AvroKeyMapper.java   |  11 +-
 .../AvroKeyRecursiveCombineFileInputFormat.java    |  69 +----
 .../mapreduce/orc/OrcKeyCompactorOutputFormat.java |  70 +++++
 .../compaction/mapreduce/orc/OrcKeyComparator.java |  90 ++++++
 .../mapreduce/orc/OrcKeyDedupReducer.java          |  50 +++
 .../gobblin/compaction/mapreduce/orc/OrcUtils.java |  72 +++++
 .../orc/OrcValueCombineFileInputFormat.java        |  37 +++
 .../orc/OrcValueCombineFileRecordReader.java       |  64 ++++
 .../compaction/mapreduce/orc/OrcValueMapper.java   |  77 +++++
 ...onTaskTest.java => AvroCompactionTaskTest.java} |   8 +-
 ...upReducerTest.java => KeyDedupReducerTest.java} |  37 +--
 .../mapreduce/OrcCompactionTaskTest.java           | 160 ++++++++++
 .../mapreduce/orc/OrcKeyComparatorTest.java        | 334 +++++++++++++++++++++
 gobblin-modules/gobblin-orc-dep/build.gradle       |  57 ++++
 .../recordcount/CompactionRecordCountProvider.java |   6 +-
 .../CompactionRecordCountProviderTest.java         |   2 +-
 32 files changed, 1324 insertions(+), 249 deletions(-)

diff --git 
a/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/AvroTestTools.java
 
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/AvroTestTools.java
index 9a7bb1d..7943cc6 100644
--- 
a/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/AvroTestTools.java
+++ 
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/AvroTestTools.java
@@ -29,7 +29,6 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.Spliterators;
 import java.util.TreeMap;
-import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
diff --git 
a/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
 
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
index 465fb89..e0d02cd 100644
--- 
a/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
+++ 
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
@@ -387,7 +387,7 @@ public class OrcTestTools extends 
DataTestTools<OrcTestTools.OrcRowIterator, Typ
   }
 
   /**
-   * An iterator over {@link GenericRecord} which is also aware of schema.
+   * An iterator over {@link OrcStruct} which is also aware of schema( 
Represented in {@link TypeInfo}).
    */
   @AllArgsConstructor
   public static class OrcRowIterator implements Iterator<Writable> {
diff --git a/gobblin-compaction/build.gradle b/gobblin-compaction/build.gradle
index d6446d1..88bc161 100644
--- a/gobblin-compaction/build.gradle
+++ b/gobblin-compaction/build.gradle
@@ -27,13 +27,16 @@ dependencies {
   compile project(":gobblin-runtime")
   compile project(":gobblin-modules:gobblin-kafka-common")
 
+  // Given orc-mapreduce depends on hive version of hive-storage-api(2.4.0) 
and conflicted
+  // with hive-exec-core in older version(1.0.1), we need to shadow 
ord-mapreduce's transitive deps.
+  compile project(path: ":gobblin-modules:gobblin-orc-dep", 
configuration:"shadow")
+
   compile externalDependency.calciteCore
   compile externalDependency.calciteAvatica
   compile externalDependency.jhyde
   compile externalDependency.avro
   compile externalDependency.commonsLang
   compile externalDependency.commonsMath
-  compile externalDependency.hiveExec
   compile externalDependency.mockito
   compile externalDependency.testng
 
@@ -58,7 +61,8 @@ dependencies {
 
 
 configurations {
-    compile { transitive = true }
+  compile { transitive = true }
+  all*.exclude group: "org.apache.hadoop", module: 'hive-exec'
 }
 
 ext.classification="library"
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index 8dc6324..eef8474 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -28,7 +28,7 @@ import 
org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
 import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner;
-import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyMapper;
+import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
 import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
 import org.apache.gobblin.configuration.State;
@@ -130,7 +130,7 @@ public class CompactionCompleteFileOperationAction 
implements CompactionComplete
         // We don't get record count from file name because tracking which 
files are actually involved in the MR execution can
         // be hard. This is due to new minutely data is rolled up to hourly 
folder but from daily compaction perspective we are not
         // able to tell which file are newly added (because we simply pass all 
hourly folders to MR job instead of individual files).
-        Counter counter = 
job.getCounters().findCounter(AvroKeyMapper.EVENT_COUNTER.RECORD_COUNT);
+        Counter counter = 
job.getCounters().findCounter(RecordKeyMapperBase.EVENT_COUNTER.RECORD_COUNT);
         newTotalRecords = counter.getValue();
       }
 
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
index 042c5a4..763f375 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
@@ -17,8 +17,16 @@
 
 package org.apache.gobblin.compaction.event;
 
+import com.google.common.base.Optional;
 import java.io.IOException;
-
+import org.apache.gobblin.compaction.dataset.Dataset;
+import org.apache.gobblin.compaction.mapreduce.MRCompactor;
+import org.apache.gobblin.compaction.mapreduce.RecordKeyDedupReducerBase;
+import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.event.sla.SlaEventKeys;
+import org.apache.gobblin.metrics.event.sla.SlaEventSubmitter;
+import 
org.apache.gobblin.metrics.event.sla.SlaEventSubmitter.SlaEventSubmitterBuilder;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Counter;
@@ -27,17 +35,6 @@ import org.apache.hadoop.mapreduce.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Optional;
-
-import org.apache.gobblin.compaction.dataset.Dataset;
-import org.apache.gobblin.compaction.mapreduce.MRCompactor;
-import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyDedupReducer;
-import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyMapper;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.metrics.event.sla.SlaEventKeys;
-import org.apache.gobblin.metrics.event.sla.SlaEventSubmitter;
-import 
org.apache.gobblin.metrics.event.sla.SlaEventSubmitter.SlaEventSubmitterBuilder;
-
 
 /**
  * Helper class to build compaction sla event metadata.
@@ -142,13 +139,13 @@ public class CompactionSlaEventHelper {
       return -1l;
     }
 
-    Counter recordCounter = 
counters.findCounter(AvroKeyDedupReducer.EVENT_COUNTER.RECORD_COUNT);
+    Counter recordCounter = 
counters.findCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.RECORD_COUNT);
 
     if (recordCounter != null && recordCounter.getValue() != 0) {
       return recordCounter.getValue();
     }
 
-    recordCounter = 
counters.findCounter(AvroKeyMapper.EVENT_COUNTER.RECORD_COUNT);
+    recordCounter = 
counters.findCounter(RecordKeyMapperBase.EVENT_COUNTER.RECORD_COUNT);
 
     if (recordCounter != null && recordCounter.getValue() != 0) {
       return recordCounter.getValue();
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java
index 020da2d..a125479 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.thrift.TException;
 
 import com.google.common.base.Splitter;
@@ -52,7 +51,7 @@ public class HiveMetadataForCompactionExtractor extends 
HiveBaseExtractor<Void,
   private MRCompactionEntity compactionEntity;
   private boolean extracted = false;
 
-  public HiveMetadataForCompactionExtractor(WorkUnitState state, FileSystem 
fs) throws IOException, TException, HiveException {
+  public HiveMetadataForCompactionExtractor(WorkUnitState state, FileSystem 
fs) throws IOException, TException {
     super(state);
 
     if 
(Boolean.valueOf(state.getPropAsBoolean(PartitionLevelWatermarker.IS_WATERMARK_WORKUNIT_KEY)))
 {
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java
index ab89df5..9f71b8b 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.compaction.hivebasedconstructs;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.thrift.TException;
 
 import org.apache.gobblin.configuration.WorkUnitState;
@@ -33,7 +32,7 @@ import 
org.apache.gobblin.data.management.conversion.hive.extractor.HiveBaseExtr
  */
 public class HiveMetadataForCompactionExtractorFactory implements 
HiveBaseExtractorFactory {
   public HiveBaseExtractor createExtractor(WorkUnitState state, FileSystem 
sourceFs)
-      throws IOException, TException, HiveException {
+      throws IOException, TException {
     return new HiveMetadataForCompactionExtractor(state, sourceFs);
   }
 }
\ No newline at end of file
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
index 14d23d5..65fdce5 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
@@ -30,9 +30,7 @@ import 
org.apache.gobblin.compaction.mapreduce.avro.AvroKeyDedupReducer;
 import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyMapper;
 import 
org.apache.gobblin.compaction.mapreduce.avro.AvroKeyRecursiveCombineFileInputFormat;
 import 
org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
-import org.apache.gobblin.compaction.suite.CompactionSuiteBase;
 import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.dataset.Dataset;
 import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
 import org.apache.gobblin.util.AvroUtils;
@@ -85,7 +83,7 @@ public class CompactionAvroJobConfigurator extends 
CompactionJobConfigurator {
   /**
    * Refer to MRCompactorAvroKeyDedupJobRunner#getKeySchema(Job, Schema)
    */
-  private Schema getKeySchema(Job job, Schema topicSchema) throws IOException {
+  private Schema getDedupKeySchema(Schema topicSchema) {
 
     boolean keySchemaFileSpecified =
         
this.state.contains(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC);
@@ -133,7 +131,7 @@ public class CompactionAvroJobConfigurator extends 
CompactionJobConfigurator {
       if 
(this.state.getPropAsBoolean(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_SINGLE_INPUT_SCHEMA,
 true)) {
         AvroJob.setInputKeySchema(job, newestSchema);
       }
-      AvroJob.setMapOutputKeySchema(job, this.shouldDeduplicate ? 
getKeySchema(job, newestSchema) : newestSchema);
+      AvroJob.setMapOutputKeySchema(job, this.shouldDeduplicate ? 
getDedupKeySchema(newestSchema) : newestSchema);
       AvroJob.setMapOutputValueSchema(job, newestSchema);
       AvroJob.setOutputKeySchema(job, newestSchema);
     }
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionCombineFileInputFormat.java
similarity index 76%
copy from 
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
copy to 
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionCombineFileInputFormat.java
index 5f864cb..6b9bfc4 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionCombineFileInputFormat.java
@@ -15,40 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.compaction.mapreduce.avro;
+package org.apache.gobblin.compaction.mapreduce;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.util.VersionInfo;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-
-/**
- * A subclass of {@link 
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat} for Avro 
inputfiles.
- * This class is able to handle the case where the input path has subdirs 
which contain data files, which
- * is not the case with {@link 
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat}.
- *
- * @author Ziyang Liu
- */
-public class AvroKeyRecursiveCombineFileInputFormat
-    extends CombineFileInputFormat<AvroKey<GenericRecord>, NullWritable> {
 
+public abstract class CompactionCombineFileInputFormat<KI, KO> extends 
CombineFileInputFormat<KI, KO> {
   private static final String COMPACTION_JOB_PREFIX = "compaction.job.";
 
   /**
@@ -103,10 +86,4 @@ public class AvroKeyRecursiveCombineFileInputFormat
     }
     return cleanedSplits;
   }
-
-  @Override
-  public RecordReader<AvroKey<GenericRecord>, NullWritable> 
createRecordReader(InputSplit split, TaskAttemptContext cx)
-      throws IOException {
-    return new CombineFileRecordReader<>((CombineFileSplit) split, cx, 
AvroKeyCombineFileRecordReader.class);
-  }
 }
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
index d88ad95..a96a99b 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
@@ -17,8 +17,6 @@
 
 package org.apache.gobblin.compaction.mapreduce;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
 import java.io.IOException;
@@ -67,7 +65,6 @@ public abstract class CompactionJobConfigurator {
   public static final String DEFAULT_COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS 
=
       
"org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator$Factory";
 
-
   @Getter
   @AllArgsConstructor
   protected enum EXTENSION {
@@ -119,7 +116,6 @@ public abstract class CompactionJobConfigurator {
   }
 
   public abstract String getFileExtension();
-
   /**
    * Customized MR job creation for Avro.
    *
@@ -186,6 +182,7 @@ public abstract class CompactionJobConfigurator {
 
   /**
    * Refer to {@link MRCompactorAvroKeyDedupJobRunner#setNumberOfReducers(Job)}
+   * Note that this method is not format specific.
    */
   protected void setNumberOfReducers(Job job) throws IOException {
 
@@ -337,13 +334,15 @@ public abstract class CompactionJobConfigurator {
   }
 
   private static List<TaskCompletionEvent> 
getUnsuccessfulTaskCompletionEvent(Job completedJob) {
-    return 
getAllTaskCompletionEvent(completedJob).stream().filter(te->te.getStatus() != 
TaskCompletionEvent.Status.SUCCEEDED).collect(
-        Collectors.toList());
+    return getAllTaskCompletionEvent(completedJob).stream()
+        .filter(te -> te.getStatus() != TaskCompletionEvent.Status.SUCCEEDED)
+        .collect(Collectors.toList());
   }
 
   private static boolean isFailedPath(Path path, List<TaskCompletionEvent> 
failedEvents) {
     return path.toString().contains("_temporary") || failedEvents.stream()
-        .anyMatch(event -> path.toString().contains(Path.SEPARATOR + 
event.getTaskAttemptId().toString() + Path.SEPARATOR));
+        .anyMatch(
+            event -> path.toString().contains(Path.SEPARATOR + 
event.getTaskAttemptId().toString() + Path.SEPARATOR));
   }
 
   /**
@@ -364,7 +363,7 @@ public abstract class CompactionJobConfigurator {
 
     List<Path> allFilePaths = DatasetHelper.getApplicableFilePaths(fs, 
tmpPath, acceptableExtension);
     List<Path> goodPaths = new ArrayList<>();
-    for (Path filePath: allFilePaths) {
+    for (Path filePath : allFilePaths) {
       if (isFailedPath(filePath, failedEvents)) {
         fs.delete(filePath, false);
         log.error("{} is a bad path so it was deleted", filePath);
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
new file mode 100644
index 0000000..ba56047
--- /dev/null
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce;
+
+import java.io.IOException;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcKeyCompactorOutputFormat;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcKeyComparator;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcKeyDedupReducer;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcUtils;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcValueMapper;
+import 
org.apache.gobblin.compaction.mapreduce.orc.OrcValueCombineFileInputFormat;
+import org.apache.gobblin.configuration.State;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.orc.OrcConf;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcValue;
+
+import static 
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*;
+
+
+public class CompactionOrcJobConfigurator extends CompactionJobConfigurator {
+  public static class Factory implements 
CompactionJobConfigurator.ConfiguratorFactory {
+    @Override
+    public CompactionJobConfigurator createConfigurator(State state) throws 
IOException {
+      return new CompactionOrcJobConfigurator(state);
+    }
+  }
+
+  public CompactionOrcJobConfigurator(State state) throws IOException {
+    super(state);
+  }
+
+  @Override
+  public String getFileExtension() {
+    return this.state.getProp(COMPACTION_OUTPUT_EXTENSION, 
EXTENSION.ORC.getExtensionString());
+  }
+
+  protected void configureSchema(Job job) throws IOException {
+    TypeDescription schema = OrcUtils.getNewestSchemaFromSource(job, this.fs);
+
+    job.getConfiguration().set(OrcConf.MAPRED_INPUT_SCHEMA.getAttribute(), 
schema.toString());
+    
job.getConfiguration().set(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute(), 
schema.toString());
+    
job.getConfiguration().set(OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.getAttribute(), 
schema.toString());
+    job.getConfiguration().set(OrcConf.MAPRED_OUTPUT_SCHEMA.getAttribute(), 
schema.toString());
+  }
+
+  protected void configureMapper(Job job) {
+    job.setInputFormatClass(OrcValueCombineFileInputFormat.class);
+    job.setMapperClass(OrcValueMapper.class);
+    job.setMapOutputKeyClass(OrcKey.class);
+    job.setMapOutputValueClass(OrcValue.class);
+    job.setGroupingComparatorClass(OrcKeyComparator.class);
+    job.setSortComparatorClass(OrcKeyComparator.class);
+  }
+
+  protected void configureReducer(Job job) throws IOException {
+    job.setReducerClass(OrcKeyDedupReducer.class);
+    job.setOutputFormatClass(OrcKeyCompactorOutputFormat.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(OrcValue.class);
+    setNumberOfReducers(job);
+  }
+}
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCompactorOutputCommitter.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactorOutputCommitter.java
similarity index 73%
rename from 
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCompactorOutputCommitter.java
rename to 
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactorOutputCommitter.java
index cca9d4f..5968095 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCompactorOutputCommitter.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactorOutputCommitter.java
@@ -15,12 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.compaction.mapreduce.avro;
+package org.apache.gobblin.compaction.mapreduce;
 
 import java.io.IOException;
 import java.lang.reflect.Method;
-
 import org.apache.commons.io.FilenameUtils;
+import 
org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
+import org.apache.gobblin.util.recordcount.CompactionRecordCountProvider;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -31,22 +32,29 @@ import 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.gobblin.util.recordcount.CompactionRecordCountProvider;
-
 
 /**
  * Class used with {@link MRCompactorAvroKeyDedupJobRunner} to rename files as 
they
  * are being committed. In addition to moving files from their working 
directory to
  * the commit output directory, the files are named to include a timestamp and 
a
  * count of how many records the file contains, in the format
- * {recordCount}.{timestamp}.avro.
+ * {recordCount}.{timestamp}.<extensionName>(avro, orc, etc.).
  */
-public class AvroKeyCompactorOutputCommitter extends FileOutputCommitter {
+public class CompactorOutputCommitter extends FileOutputCommitter {
+  /**
+   * Note that the value of this key doesn't have dot.
+   */
+  public static final String COMPACTION_OUTPUT_EXTENSION = 
"compaction.output.extension";
+  public static final String DEFAULT_COMPACTION_OUTPUT_EXTENSION = "avro";
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactorOutputCommitter.class);
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(AvroKeyCompactorOutputCommitter.class);
+  private final String compactionFileExtension;
 
-  public AvroKeyCompactorOutputCommitter(Path output, TaskAttemptContext 
context) throws IOException {
+  public CompactorOutputCommitter(Path output, TaskAttemptContext context) 
throws IOException {
     super(output, context);
+    compactionFileExtension = 
context.getConfiguration().get(COMPACTION_OUTPUT_EXTENSION,
+        DEFAULT_COMPACTION_OUTPUT_EXTENSION);
   }
 
   /**
@@ -62,23 +70,24 @@ public class AvroKeyCompactorOutputCommitter extends 
FileOutputCommitter {
     FileSystem fs = workPath.getFileSystem(context.getConfiguration());
 
     if (fs.exists(workPath)) {
-      long recordCount = getRecordCountFromCounter(context, 
AvroKeyDedupReducer.EVENT_COUNTER.RECORD_COUNT);
+      long recordCount = getRecordCountFromCounter(context, 
RecordKeyDedupReducerBase.EVENT_COUNTER.RECORD_COUNT);
       String fileNamePrefix;
       if (recordCount == 0) {
 
         // recordCount == 0 indicates that it is a map-only, non-dedup job, 
and thus record count should
         // be obtained from mapper counter.
         fileNamePrefix = CompactionRecordCountProvider.M_OUTPUT_FILE_PREFIX;
-        recordCount = getRecordCountFromCounter(context, 
AvroKeyMapper.EVENT_COUNTER.RECORD_COUNT);
+        recordCount = getRecordCountFromCounter(context, 
RecordKeyMapperBase.EVENT_COUNTER.RECORD_COUNT);
       } else {
         fileNamePrefix = CompactionRecordCountProvider.MR_OUTPUT_FILE_PREFIX;
       }
-      String fileName = 
CompactionRecordCountProvider.constructFileName(fileNamePrefix, recordCount);
+      String fileName = 
CompactionRecordCountProvider.constructFileName(fileNamePrefix,
+          "." + compactionFileExtension, recordCount);
 
       for (FileStatus status : fs.listStatus(workPath, new PathFilter() {
         @Override
         public boolean accept(Path path) {
-          return FilenameUtils.isExtension(path.getName(), "avro");
+          return FilenameUtils.isExtension(path.getName(), 
compactionFileExtension);
         }
       })) {
         Path newPath = new Path(status.getPath().getParent(), fileName);
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/RecordKeyDedupReducerBase.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/RecordKeyDedupReducerBase.java
new file mode 100644
index 0000000..adcb603
--- /dev/null
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/RecordKeyDedupReducerBase.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce;
+
+import com.google.common.base.Optional;
+import java.io.IOException;
+import java.util.Comparator;
+import lombok.Getter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+
+/**
+ * A base implementation of deduplication reducer that is format-unaware.
+ */
+public abstract class RecordKeyDedupReducerBase<KI, VI, KO, VO> extends 
Reducer<KI, VI, KO, VO> {
+  public enum EVENT_COUNTER {
+    MORE_THAN_1,
+    DEDUPED,
+    RECORD_COUNT
+  }
+
+  /**
+   * In most of cases, one of following will be {@link NullWritable}
+   */
+  @Getter
+  protected KO outKey;
+
+  @Getter
+  protected VO outValue;
+
+
+  protected Optional<Comparator<VI>> deltaComparatorOptional;
+
+
+  protected abstract void initReusableObject();
+
+  /**
+   * Assign output value to reusable object.
+   * @param valueToRetain the output value determined after dedup process.
+   */
+  protected abstract void setOutKey(VI valueToRetain);
+
+  /**
+   * Added to avoid loss of flexibility to put output value in key/value.
+   * Usually for compaction job, either implement {@link #setOutKey} or this.
+   */
+  protected abstract void setOutValue(VI valueToRetain);
+
+  protected abstract void initDeltaComparator(Configuration conf);
+
+
+  @Override
+  protected void setup(Context context) {
+    initReusableObject();
+    initDeltaComparator(context.getConfiguration());
+  }
+
+  @Override
+  protected void reduce(KI key, Iterable<VI> values, Context context)
+      throws IOException, InterruptedException {
+    int numVals = 0;
+
+    VI valueToRetain = null;
+
+    for (VI value : values) {
+      if (valueToRetain == null) {
+        valueToRetain = value;
+      } else if (deltaComparatorOptional.isPresent()) {
+        valueToRetain = deltaComparatorOptional.get().compare(valueToRetain, 
value) >= 0 ? valueToRetain : value;
+      }
+      numVals++;
+    }
+
+    setOutKey(valueToRetain);
+    setOutValue(valueToRetain);
+
+    if (numVals > 1) {
+      context.getCounter(EVENT_COUNTER.MORE_THAN_1).increment(1);
+      context.getCounter(EVENT_COUNTER.DEDUPED).increment(numVals - 1);
+    }
+
+    context.getCounter(EVENT_COUNTER.RECORD_COUNT).increment(1);
+
+    // Safety check
+    if (outKey == null || outValue == null) {
+      throw new IllegalStateException("Either outKey or outValue is not being 
properly initialized");
+    }
+
+    context.write(this.outKey, this.outValue);
+  }
+}
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProviderTest.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/RecordKeyMapperBase.java
similarity index 50%
copy from 
gobblin-utility/src/test/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProviderTest.java
copy to 
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/RecordKeyMapperBase.java
index d78fc79..1f71942 100644
--- 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProviderTest.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/RecordKeyMapperBase.java
@@ -15,26 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.util.recordcount;
+package org.apache.gobblin.compaction.mapreduce;
 
-import java.util.regex.Pattern;
+import org.apache.hadoop.mapreduce.Mapper;
 
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.Test;
 
-
-/**
- * Tests for {@link CompactionRecordCountProvider}.
- */
-@Test(groups = { "gobblin.util.recordcount" })
-public class CompactionRecordCountProviderTest {
-  @Test
-  public void testFileNameRecordCountProvider() {
-    CompactionRecordCountProvider filenameRecordCountProvider = new 
CompactionRecordCountProvider();
-
-    Pattern pattern = 
Pattern.compile("part\\-r\\-123\\.[\\d]*\\.[\\d]*\\.avro");
-    
Assert.assertTrue(pattern.matcher(CompactionRecordCountProvider.constructFileName("part-r-",
 123)).matches());
-    Assert.assertEquals(filenameRecordCountProvider.getRecordCount(new 
Path("part-r-123.1.2.avro")), 123);
+public abstract class RecordKeyMapperBase<KI, VI, KO, VO> extends Mapper<KI, 
VI, KO, VO> {
+  public enum EVENT_COUNTER {
+    RECORD_COUNT
   }
 }
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCompactorOutputFormat.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCompactorOutputFormat.java
index f2d5c00..8a03aa5 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCompactorOutputFormat.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCompactorOutputFormat.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.compaction.mapreduce.avro;
 import java.io.IOException;
 
 import org.apache.avro.mapreduce.AvroKeyOutputFormat;
+import org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
@@ -38,7 +39,7 @@ public class AvroKeyCompactorOutputFormat<T> extends 
AvroKeyOutputFormat<T> {
   @Override
   public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext 
context) throws IOException {
     if (this.committer == null) {
-      this.committer = new 
AvroKeyCompactorOutputCommitter(FileOutputFormat.getOutputPath(context), 
context);
+      this.committer = new 
CompactorOutputCommitter(FileOutputFormat.getOutputPath(context), context);
     }
     return this.committer;
   }
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyDedupReducer.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyDedupReducer.java
index 130e544..81f2ca5 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyDedupReducer.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyDedupReducer.java
@@ -17,20 +17,16 @@
 
 package org.apache.gobblin.compaction.mapreduce.avro;
 
-import java.io.IOException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import java.util.Comparator;
-
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroValue;
+import org.apache.gobblin.compaction.mapreduce.RecordKeyDedupReducerBase;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Reducer;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 
 /**
@@ -40,61 +36,40 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
  *
  * @author Ziyang Liu
  */
-public class AvroKeyDedupReducer extends Reducer<AvroKey<GenericRecord>, 
AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
-
-  public enum EVENT_COUNTER {
-    MORE_THAN_1,
-    DEDUPED,
-    RECORD_COUNT
-  }
+public class AvroKeyDedupReducer extends 
RecordKeyDedupReducerBase<AvroKey<GenericRecord>, AvroValue<GenericRecord>,
+    AvroKey<GenericRecord>, NullWritable> {
 
   public static final String DELTA_SCHEMA_PROVIDER =
       "org.apache.gobblin.compaction." + 
AvroKeyDedupReducer.class.getSimpleName() + ".deltaFieldsProvider";
-  private AvroKey<GenericRecord> outKey;
-  private Optional<AvroValueDeltaSchemaComparator> deltaComparatorOptional;
-  private AvroDeltaFieldNameProvider deltaFieldNamesProvider;
 
   @Override
-  protected void setup(Context context)
-      throws IOException, InterruptedException {
-    this.outKey = new AvroKey<>();
-    this.deltaComparatorOptional = Optional.absent();
-    Configuration conf = context.getConfiguration();
-    String deltaSchemaProviderClassName = conf.get(DELTA_SCHEMA_PROVIDER);
-    if (deltaSchemaProviderClassName != null) {
-      this.deltaFieldNamesProvider =
-          
GobblinConstructorUtils.invokeConstructor(AvroDeltaFieldNameProvider.class, 
deltaSchemaProviderClassName, conf);
-      this.deltaComparatorOptional = Optional.of(new 
AvroValueDeltaSchemaComparator(deltaFieldNamesProvider));
-    }
+  protected void initReusableObject() {
+    outKey = new AvroKey<>();
+    outValue = NullWritable.get();
   }
 
   @Override
-  protected void reduce(AvroKey<GenericRecord> key, 
Iterable<AvroValue<GenericRecord>> values, Context context)
-      throws IOException, InterruptedException {
-    int numVals = 0;
-
-    AvroValue<GenericRecord> valueToRetain = null;
+  protected void setOutKey(AvroValue<GenericRecord> valueToRetain) {
+    outKey.datum(valueToRetain.datum());
+  }
 
-    for (AvroValue<GenericRecord> value : values) {
-      if (valueToRetain == null) {
-        valueToRetain = value;
-      } else if (this.deltaComparatorOptional.isPresent()) {
-        valueToRetain = 
this.deltaComparatorOptional.get().compare(valueToRetain, value) >= 0 ? 
valueToRetain : value;
-      }
-      numVals++;
-    }
-    this.outKey.datum(valueToRetain.datum());
+  @Override
+  protected void setOutValue(AvroValue<GenericRecord> valueToRetain) {
+    // do nothing since initReusableObject has assigned value for outValue.
+  }
 
-    if (numVals > 1) {
-      context.getCounter(EVENT_COUNTER.MORE_THAN_1).increment(1);
-      context.getCounter(EVENT_COUNTER.DEDUPED).increment(numVals - 1);
+  @Override
+  protected void initDeltaComparator(Configuration conf) {
+    deltaComparatorOptional = Optional.absent();
+    String deltaSchemaProviderClassName = conf.get(DELTA_SCHEMA_PROVIDER);
+    if (deltaSchemaProviderClassName != null) {
+      deltaComparatorOptional = Optional.of(new AvroValueDeltaSchemaComparator(
+          
GobblinConstructorUtils.invokeConstructor(AvroDeltaFieldNameProvider.class, 
deltaSchemaProviderClassName,
+              conf)));
     }
-
-    context.getCounter(EVENT_COUNTER.RECORD_COUNT).increment(1);
-
-    context.write(this.outKey, NullWritable.get());
   }
 
+
   @VisibleForTesting
   protected static class AvroValueDeltaSchemaComparator implements 
Comparator<AvroValue<GenericRecord>> {
     private final AvroDeltaFieldNameProvider deltaSchemaProvider;
@@ -105,21 +80,16 @@ public class AvroKeyDedupReducer extends 
Reducer<AvroKey<GenericRecord>, AvroVal
 
     @Override
     public int compare(AvroValue<GenericRecord> o1, AvroValue<GenericRecord> 
o2) {
-      GenericRecord record1= o1.datum();
+      GenericRecord record1 = o1.datum();
       GenericRecord record2 = o2.datum();
       for (String deltaFieldName : 
this.deltaSchemaProvider.getDeltaFieldNames(record1)) {
         if (record1.get(deltaFieldName).equals(record2.get(deltaFieldName))) {
           continue;
         }
-        return 
((Comparable)record1.get(deltaFieldName)).compareTo(record2.get(deltaFieldName));
+        return ((Comparable) 
record1.get(deltaFieldName)).compareTo(record2.get(deltaFieldName));
       }
 
       return 0;
     }
   }
-
-  @VisibleForTesting
-  protected AvroKey<GenericRecord> getOutKey() {
-    return this.outKey;
-  }
 }
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyMapper.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyMapper.java
index 6f16d33..e4edc82 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyMapper.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyMapper.java
@@ -28,6 +28,7 @@ import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroValue;
 import org.apache.avro.mapreduce.AvroJob;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -44,12 +45,8 @@ import 
org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
  *
  * @author Ziyang Liu
  */
-public class AvroKeyMapper extends Mapper<AvroKey<GenericRecord>, 
NullWritable, AvroKey<GenericRecord>, Object> {
-
-  public enum EVENT_COUNTER {
-    RECORD_COUNT
-  }
-
+public class AvroKeyMapper extends
+                           RecordKeyMapperBase<AvroKey<GenericRecord>, 
NullWritable, AvroKey<GenericRecord>, Object> {
   private AvroKey<GenericRecord> outKey;
   private AvroValue<GenericRecord> outValue;
   private Schema keySchema;
@@ -86,7 +83,7 @@ public class AvroKeyMapper extends 
Mapper<AvroKey<GenericRecord>, NullWritable,
    * Target record's schema cannot have MAP, ARRAY or ENUM fields, or UNION 
fields that
    * contain these fields.
    */
-  private static void populateComparableKeyRecord(GenericRecord source, 
GenericRecord target) {
+  private void populateComparableKeyRecord(GenericRecord source, GenericRecord 
target) {
     for (Field field : target.getSchema().getFields()) {
       if (field.schema().getType() == Schema.Type.UNION) {
 
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
index 5f864cb..30de419 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
@@ -18,25 +18,15 @@
 package org.apache.gobblin.compaction.mapreduce.avro;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.AvroKey;
+import 
org.apache.gobblin.compaction.mapreduce.CompactionCombineFileInputFormat;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.util.VersionInfo;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 
 
 /**
@@ -47,62 +37,7 @@ import com.google.common.collect.Lists;
  * @author Ziyang Liu
  */
 public class AvroKeyRecursiveCombineFileInputFormat
-    extends CombineFileInputFormat<AvroKey<GenericRecord>, NullWritable> {
-
-  private static final String COMPACTION_JOB_PREFIX = "compaction.job.";
-
-  /**
-   * Properties related to the input format of the compaction job of a dataset.
-   */
-  private static final String COMPACTION_JOB_MAPRED_MAX_SPLIT_SIZE = 
COMPACTION_JOB_PREFIX + "mapred.max.split.size";
-  private static final long DEFAULT_COMPACTION_JOB_MAPRED_MAX_SPLIT_SIZE = 
268435456;
-  private static final String COMPACTION_JOB_MAPRED_MIN_SPLIT_SIZE = 
COMPACTION_JOB_PREFIX + "mapred.min.split.size";
-  private static final long DEFAULT_COMPACTION_JOB_MAPRED_MIN_SPLIT_SIZE = 
268435456;
-
-  private static final int SPLIT_MAX_NUM_LOCATIONS = 10;
-
-  @Override
-  public List<InputSplit> getSplits(JobContext cx) throws IOException {
-    Job modifiedJob = Job.getInstance(cx.getConfiguration());
-    setSplitSize(modifiedJob);
-    FileInputFormat.setInputDirRecursive(modifiedJob, true);
-    return cleanSplits(super.getSplits(modifiedJob));
-  }
-
-  private void setSplitSize(JobContext cx) {
-    
super.setMaxSplitSize(cx.getConfiguration().getLong(COMPACTION_JOB_MAPRED_MAX_SPLIT_SIZE,
-        DEFAULT_COMPACTION_JOB_MAPRED_MAX_SPLIT_SIZE));
-    
super.setMinSplitSizeNode(cx.getConfiguration().getLong(COMPACTION_JOB_MAPRED_MIN_SPLIT_SIZE,
-        DEFAULT_COMPACTION_JOB_MAPRED_MIN_SPLIT_SIZE));
-  }
-
-  /**
-   * Set the number of locations in the split to SPLIT_MAX_NUM_LOCATIONS if it 
is larger than
-   * SPLIT_MAX_NUM_LOCATIONS (MAPREDUCE-5186).
-   */
-  private static List<InputSplit> cleanSplits(List<InputSplit> splits) throws 
IOException {
-    if (VersionInfo.getVersion().compareTo("2.3.0") >= 0) {
-      // This issue was fixed in 2.3.0, if newer version, no need to clean up 
splits
-      return splits;
-    }
-
-    List<InputSplit> cleanedSplits = Lists.newArrayList();
-
-    for (int i = 0; i < splits.size(); i++) {
-      CombineFileSplit oldSplit = (CombineFileSplit) splits.get(i);
-      String[] locations = oldSplit.getLocations();
-
-      Preconditions.checkNotNull(locations, "CombineFileSplit.getLocations() 
returned null");
-
-      if (locations.length > SPLIT_MAX_NUM_LOCATIONS) {
-        locations = Arrays.copyOf(locations, SPLIT_MAX_NUM_LOCATIONS);
-      }
-
-      cleanedSplits.add(new CombineFileSplit(oldSplit.getPaths(), 
oldSplit.getStartOffsets(), oldSplit.getLengths(),
-          locations));
-    }
-    return cleanedSplits;
-  }
+    extends CompactionCombineFileInputFormat<AvroKey<GenericRecord>, 
NullWritable> {
 
   @Override
   public RecordReader<AvroKey<GenericRecord>, NullWritable> 
createRecordReader(InputSplit split, TaskAttemptContext cx)
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyCompactorOutputFormat.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyCompactorOutputFormat.java
new file mode 100644
index 0000000..9c94224
--- /dev/null
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyCompactorOutputFormat.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.orc;
+
+import java.io.IOException;
+import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
+import org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Writer;
+import org.apache.orc.mapreduce.OrcMapreduceRecordWriter;
+import org.apache.orc.mapreduce.OrcOutputFormat;
+
+import static 
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*;
+
+
+/**
+ * Extension of {@link OrcOutputFormat} for customized {@link 
CompactorOutputCommitter}
+ */
+public class OrcKeyCompactorOutputFormat extends OrcOutputFormat {
+
+  private FileOutputCommitter committer = null;
+
+  @Override
+  public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext 
context) throws IOException {
+    if (this.committer == null) {
+      this.committer = new 
CompactorOutputCommitter(FileOutputFormat.getOutputPath(context), context);
+    }
+    return this.committer;
+  }
+
+  /**
+   * Required for extension since super method hard-coded file extension as 
".orc". To keep flexibility
+   * of extension name, we made it configuration driven.
+   * @param taskAttemptContext The source of configuration that determines the 
file extension
+   * @return The {@link RecordWriter} that write out Orc object.
+   * @throws IOException
+   */
+  @Override
+  public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) 
throws IOException {
+    Configuration conf = taskAttemptContext.getConfiguration();
+    String extension = "." + conf.get(COMPACTION_OUTPUT_EXTENSION, "orc" );
+
+    Path filename = getDefaultWorkFile(taskAttemptContext, extension);
+    Writer writer = OrcFile.createWriter(filename,
+        org.apache.orc.mapred.OrcOutputFormat.buildOptions(conf));
+    return new OrcMapreduceRecordWriter(writer);
+  }
+}
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyComparator.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyComparator.java
new file mode 100644
index 0000000..efcd1f5
--- /dev/null
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyComparator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.orc;
+
+import java.io.DataInput;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.orc.OrcConf;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcStruct;
+
+
+/**
+ * Compare {@link OrcKey} in shuffle of MapReduce.
+ * Delegate byte decoding to underlying {@link 
OrcStruct#readFields(DataInput)} method to simplify comparison.
+ */
+public class OrcKeyComparator extends Configured implements 
RawComparator<OrcKey> {
+  private TypeDescription schema;
+  private OrcKey key1;
+  private OrcKey key2;
+  private DataInputBuffer buffer;
+
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    if (null != conf) {
+      // The MapReduce framework will be using this comparator to sort OrcKey 
objects
+      // output from the map phase, so use the schema defined for the map 
output key
+      // and the data model non-raw compare() implementation.
+      schema = 
TypeDescription.fromString(conf.get(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute()));
+      OrcStruct orcRecordModel = (OrcStruct) OrcStruct.createValue(schema);
+
+      if (key1 == null) {
+        key1 = new OrcKey();
+      }
+      if (key2 == null) {
+        key2 = new OrcKey();
+      }
+      if (buffer == null) {
+        buffer = new DataInputBuffer();
+      }
+
+      key1.key = orcRecordModel;
+      key2.key = orcRecordModel;
+    }
+  }
+
+  @Override
+  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+    try {
+      buffer.reset(b1, s1, l1);                   // parse key1
+      key1.readFields(buffer);
+
+      buffer.reset(b2, s2, l2);                   // parse key2
+      key2.readFields(buffer);
+
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    return compare(key1, key2);                   // compare them
+  }
+
+  @Override
+  public int compare(OrcKey o1, OrcKey o2) {
+    if (!(o1.key instanceof OrcStruct) || !(o2.key instanceof OrcStruct)) {
+      throw new IllegalStateException("OrcKey should have its key value be 
instance of OrcStruct");
+    }
+    return ((OrcStruct) o1.key).compareTo((OrcStruct) o2.key);
+  }
+}
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
new file mode 100644
index 0000000..ad5c4dd
--- /dev/null
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.orc;
+
+import com.google.common.base.Optional;
+import org.apache.gobblin.compaction.mapreduce.RecordKeyDedupReducerBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcValue;
+
+
+public class OrcKeyDedupReducer extends RecordKeyDedupReducerBase<OrcKey, 
OrcValue, NullWritable, OrcValue> {
+  @Override
+  protected void setOutValue(OrcValue valueToRetain) {
+    // Better to copy instead reassigning reference.
+    outValue.value = valueToRetain.value;
+  }
+
+  @Override
+  protected void setOutKey(OrcValue valueToRetain) {
+    // do nothing since initReusableObject has assigned value for outKey.
+  }
+
+  @Override
+  protected void initDeltaComparator(Configuration conf) {
+    deltaComparatorOptional = Optional.absent();
+  }
+
+  @Override
+  protected void initReusableObject() {
+    outKey = NullWritable.get();
+    outValue = new OrcValue();
+  }
+}
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
new file mode 100644
index 0000000..7f82690
--- /dev/null
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.orc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import 
org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
+import org.apache.gobblin.util.FileListUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+
+
+public class OrcUtils {
+  // For Util class to prevent initialization
+  private OrcUtils() {
+
+  }
+
+  public static TypeDescription getTypeDescriptionFromFile(Configuration conf, 
Path orcFilePath) throws IOException {
+    return getRecordReaderFromFile(conf, orcFilePath).getSchema();
+  }
+
+  public static Reader getRecordReaderFromFile(Configuration conf, Path 
orcFilePath) throws IOException {
+    return OrcFile.createReader(orcFilePath, new OrcFile.ReaderOptions(conf));
+  }
+
+  public static TypeDescription getNewestSchemaFromSource(Job job, FileSystem 
fs) throws IOException {
+    Path[] sourceDirs = FileInputFormat.getInputPaths(job);
+
+    List<FileStatus> files = new ArrayList<FileStatus>();
+
+    for (Path sourceDir : sourceDirs) {
+      files.addAll(FileListUtils.listFilesRecursively(fs, sourceDir));
+    }
+    Collections.sort(files, new 
MRCompactorAvroKeyDedupJobRunner.LastModifiedDescComparator());
+
+    TypeDescription resultSchema;
+    for (FileStatus status : files) {
+      resultSchema = getTypeDescriptionFromFile(job.getConfiguration(), 
status.getPath());
+      if (resultSchema != null) {
+        return resultSchema;
+      }
+    }
+
+    throw new IllegalStateException(
+        String.format("There's no file carrying orc file schema in %s list", 
sourceDirs));
+  }
+}
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueCombineFileInputFormat.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueCombineFileInputFormat.java
new file mode 100644
index 0000000..0a55567
--- /dev/null
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueCombineFileInputFormat.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.orc;
+
+import java.io.IOException;
+import 
org.apache.gobblin.compaction.mapreduce.CompactionCombineFileInputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.orc.mapred.OrcValue;
+
+public class OrcValueCombineFileInputFormat extends 
CompactionCombineFileInputFormat<NullWritable, OrcValue> {
+
+  @Override
+  public RecordReader<NullWritable, OrcValue> createRecordReader(InputSplit 
split, TaskAttemptContext context)
+      throws IOException {
+    return new CombineFileRecordReader((CombineFileSplit) split, context, 
OrcValueCombineFileRecordReader.class);
+  }
+}
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueCombineFileRecordReader.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueCombineFileRecordReader.java
new file mode 100644
index 0000000..3976e53
--- /dev/null
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueCombineFileRecordReader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.orc;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapreduce.OrcMapreduceRecordReader;
+
+
+public class OrcValueCombineFileRecordReader extends OrcMapreduceRecordReader {
+  private final CombineFileSplit split;
+  private final Integer splitIdx;
+
+  public OrcValueCombineFileRecordReader(CombineFileSplit split, 
TaskAttemptContext context, Integer idx)
+      throws IOException {
+    this(getRecordReaderFromFile(split, context, idx), getSchema(split, 
context, idx), split, idx);
+  }
+
+  public OrcValueCombineFileRecordReader(RecordReader reader, TypeDescription 
schema, CombineFileSplit split,
+      Integer splitIdx) throws IOException {
+    super(reader, schema);
+    this.split = split;
+    this.splitIdx = splitIdx;
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext) {
+    super.initialize(new FileSplit(this.split.getPath(this.splitIdx), 
this.split.getOffset(this.splitIdx),
+        this.split.getLength(this.splitIdx), null), taskAttemptContext);
+  }
+
+  private static TypeDescription getSchema(CombineFileSplit split, 
TaskAttemptContext context, Integer idx)
+      throws IOException {
+    Path path = split.getPath(idx);
+    return OrcUtils.getTypeDescriptionFromFile(context.getConfiguration(), 
path);
+  }
+
+  private static RecordReader getRecordReaderFromFile(CombineFileSplit split, 
TaskAttemptContext context, Integer idx)
+      throws IOException {
+    Path path = split.getPath(idx);
+    return OrcUtils.getRecordReaderFromFile(context.getConfiguration(), 
path).rows();
+  }
+}
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
new file mode 100644
index 0000000..0dd3b5c
--- /dev/null
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.orc;
+
+import java.io.IOException;
+import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.apache.orc.mapreduce.OrcMapreduceRecordReader;
+
+
+/**
+ * To keep consistent with {@link OrcMapreduceRecordReader}'s decision on 
implementing
+ * {@link RecordReader} with {@link NullWritable} as the key and generic type 
of value, the ORC Mapper will
+ * read in the record as the input value.
+ */
+public class OrcValueMapper extends RecordKeyMapperBase<NullWritable, 
OrcStruct, OrcKey, Object> {
+
+  private OrcKey outKey;
+  private OrcValue outValue;
+
+  @Override
+  protected void setup(Context context) throws IOException, 
InterruptedException {
+    super.setup(context);
+    this.outKey = new OrcKey();
+    this.outValue = new OrcValue();
+  }
+
+  @Override
+  protected void map(NullWritable key, OrcStruct orcStruct, Context context) 
throws IOException, InterruptedException {
+    if (context.getNumReduceTasks() == 0) {
+      this.outKey.key = orcStruct;
+      context.write(this.outKey, NullWritable.get());
+    } else {
+      this.outValue.value = orcStruct;
+      context.write(getDedupKey(orcStruct), this.outValue);
+    }
+
+    context.getCounter(EVENT_COUNTER.RECORD_COUNT).increment(1);
+  }
+
+  /**
+   * By default, dedup key contains the whole ORC record, except MAP since 
{@link org.apache.orc.mapred.OrcMap} is
+   * an implementation of {@link java.util.TreeMap} which doesn't accept 
difference of records within the map in comparison.
+   */
+  protected OrcKey getDedupKey(OrcStruct originalRecord) {
+    return convertOrcStructToOrcKey(originalRecord);
+  }
+
+  /**
+   * The output key of mapper needs to be comparable. In the scenarios that we 
need the orc record itself
+   * to be the output key, this conversion will be necessary.
+   */
+  protected OrcKey convertOrcStructToOrcKey(OrcStruct struct) {
+    OrcKey orcKey = new OrcKey();
+    orcKey.key = struct;
+    return orcKey;
+  }
+}
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
similarity index 99%
rename from 
gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java
rename to 
gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
index 51fe866..909d97c 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
@@ -52,7 +52,7 @@ import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
 
 
 @Slf4j
-public class MRCompactionTaskTest {
+public class AvroCompactionTaskTest {
 
   protected FileSystem getFileSystem()
       throws IOException {
@@ -163,7 +163,7 @@ public class MRCompactionTaskTest {
       writer.close();
   }
 
-  private EmbeddedGobblin createEmbeddedGobblin (String name, String basePath) 
{
+  static EmbeddedGobblin createEmbeddedGobblin (String name, String basePath) {
     String pattern = new Path(basePath, "*/*/minutely/*/*/*/*").toString();
 
     return new EmbeddedGobblin(name)
@@ -201,7 +201,7 @@ public class MRCompactionTaskTest {
         .setConfiguration(SimpleDatasetHierarchicalPrioritizer.TIER_KEY + 
".2", "BizProfile");
   }
 
-   @Test
+  @Test
    public void testWorkUnitStream () throws Exception {
      File basePath = Files.createTempDir();
      basePath.deleteOnExit();
@@ -221,7 +221,7 @@ public class MRCompactionTaskTest {
      Assert.assertTrue(result.isSuccessful());
    }
 
-  @Test
+   @Test
   public void testWorkUnitStreamForAllFailures () throws Exception {
     File basePath = Files.createTempDir();
     basePath.deleteOnExit();
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyDedupReducerTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/KeyDedupReducerTest.java
similarity index 84%
rename from 
gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyDedupReducerTest.java
rename to 
gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/KeyDedupReducerTest.java
index 2de03d1..6183f35 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyDedupReducerTest.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/KeyDedupReducerTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.compaction.mapreduce.avro;
+package org.apache.gobblin.compaction.mapreduce;
 
 import java.io.IOException;
 import java.util.List;
@@ -25,6 +25,8 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroValue;
+import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyDedupReducer;
+import 
org.apache.gobblin.compaction.mapreduce.avro.FieldAttributeBasedDeltaFieldsProvider;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Counter;
@@ -42,17 +44,18 @@ import static org.mockito.Mockito.when;
 
 
 /**
- * Test class for {@link AvroKeyDedupReducer}.
+ * Test class for {@link 
org.apache.gobblin.compaction.mapreduce.RecordKeyDedupReducerBase}.
+ * Will have test separately in both avro and orc.
  */
-public class AvroKeyDedupReducerTest {
-  private static final String KEY_SCHEMA =
+public class KeyDedupReducerTest {
+  private static final String AVRO_KEY_SCHEMA =
       "{ \"type\" : \"record\",  \"name\" : \"etl\",\"namespace\" : 
\"reducerTest\",  \"fields\" : [ { \"name\" : "
           + "\"key\", \"type\" : {\"type\" : \"record\", \"name\" : 
\"key_name\", \"namespace\" : \"key_namespace\",  "
           + "\"fields\" : [ {\"name\" : \"partitionKey\", \"type\" : \"long\", 
\"doc\" : \"\"}, { \"name\" : \"environment"
           + "\", \"type\" : \"string\",\"doc\" : \"\"}, {\"name\" : 
\"subKey\",\"type\" : \"string\", \"doc\" : \"\"} ]}, "
           + "\"doc\" : \"\", \"attributes_json\" : 
\"{\\\"delta\\\":false,\\\"pk\\\":true}\" }]}";
 
-  private static final String FULL_SCHEMA =
+  private static final String AVRO_FULL_SCHEMA =
       "{ \"type\" : \"record\",  \"name\" : \"etl\",\"namespace\" : 
\"reducerTest\",  \"fields\" : [ { \"name\" : "
           + "\"key\", \"type\" : {\"type\" : \"record\", \"name\" : 
\"key_name\", \"namespace\" : \"key_namespace\",  "
           + "\"fields\" : [ {\"name\" : \"partitionKey\", \"type\" : \"long\", 
\"doc\" : \"\"}, { \"name\" : \"environment"
@@ -63,7 +66,7 @@ public class AvroKeyDedupReducerTest {
           + " , {\"name\" : \"scn\", \"type\": \"long\", \"doc\" : \"\", 
\"attributes_json\" : \"{\\\"nullable\\\":false,\\\"delta"
           + "\\\":true,\\\"pk\\\":false,\\\"type\\\":\\\"NUMBER\\\"}\"}]}";
 
-  private static final String FULL_SCHEMA_WITH_TWO_DELTA_FIELDS =
+  private static final String AVRO_FULL_SCHEMA_WITH_TWO_DELTA_FIELDS =
       "{ \"type\" : \"record\",  \"name\" : \"etl\",\"namespace\" : 
\"reducerTest\",  \"fields\" : [ { \"name\" : "
           + "\"key\", \"type\" : {\"type\" : \"record\", \"name\" : 
\"key_name\", \"namespace\" : \"key_namespace\",  "
           + "\"fields\" : [ {\"name\" : \"partitionKey\", \"type\" : \"long\", 
\"doc\" : \"\"}, { \"name\" : \"environment"
@@ -75,9 +78,9 @@ public class AvroKeyDedupReducerTest {
           + "\\\":true,\\\"pk\\\":false,\\\"type\\\":\\\"NUMBER\\\"}\"}]}";
 
   @Test
-  public void testReduce()
+  public void testAvroReduce()
       throws IOException, InterruptedException {
-    Schema keySchema = new Schema.Parser().parse(KEY_SCHEMA);
+    Schema keySchema = new Schema.Parser().parse(AVRO_KEY_SCHEMA);
     GenericRecordBuilder keyRecordBuilder = new 
GenericRecordBuilder(keySchema.getField("key").schema());
     keyRecordBuilder.set("partitionKey", 1);
     keyRecordBuilder.set("environment", "test");
@@ -88,7 +91,7 @@ public class AvroKeyDedupReducerTest {
     GenericRecord keyRecord = keyRecordBuilder.build();
 
     // Test reducer with delta field "scn"
-    Schema fullSchema = new Schema.Parser().parse(FULL_SCHEMA);
+    Schema fullSchema = new Schema.Parser().parse(AVRO_FULL_SCHEMA);
     AvroValue<GenericRecord> fullRecord1 = new AvroValue<>();
     AvroValue<GenericRecord> fullRecord2 = new AvroValue<>();
     AvroValue<GenericRecord> fullRecord3 = new AvroValue<>();
@@ -117,18 +120,19 @@ public class AvroKeyDedupReducerTest {
     when(conf.get(FieldAttributeBasedDeltaFieldsProvider.DELTA_PROP_NAME,
         FieldAttributeBasedDeltaFieldsProvider.DEFAULT_DELTA_PROP_NAME))
         
.thenReturn(FieldAttributeBasedDeltaFieldsProvider.DEFAULT_DELTA_PROP_NAME);
-    AvroKeyDedupReducer reducer = new AvroKeyDedupReducer();
+    RecordKeyDedupReducerBase<AvroKey<GenericRecord>, AvroValue<GenericRecord>,
+        AvroKey<GenericRecord>, NullWritable> reducer = new 
AvroKeyDedupReducer();
 
     WrappedReducer.Context reducerContext = mock(WrappedReducer.Context.class);
     when(reducerContext.getConfiguration()).thenReturn(conf);
     Counter moreThan1Counter = new GenericCounter();
-    
when(reducerContext.getCounter(AvroKeyDedupReducer.EVENT_COUNTER.MORE_THAN_1)).thenReturn(moreThan1Counter);
+    
when(reducerContext.getCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.MORE_THAN_1)).thenReturn(moreThan1Counter);
 
     Counter dedupedCounter = new GenericCounter();
-    
when(reducerContext.getCounter(AvroKeyDedupReducer.EVENT_COUNTER.DEDUPED)).thenReturn(dedupedCounter);
+    
when(reducerContext.getCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.DEDUPED)).thenReturn(dedupedCounter);
 
     Counter recordCounter = new GenericCounter();
-    
when(reducerContext.getCounter(AvroKeyDedupReducer.EVENT_COUNTER.RECORD_COUNT)).thenReturn(recordCounter);
+    
when(reducerContext.getCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.RECORD_COUNT)).thenReturn(recordCounter);
     reducer.setup(reducerContext);
 
     doNothing().when(reducerContext).write(any(AvroKey.class), 
any(NullWritable.class));
@@ -144,13 +148,14 @@ public class AvroKeyDedupReducerTest {
     Configuration conf2 = mock(Configuration.class);
     
when(conf2.get(AvroKeyDedupReducer.DELTA_SCHEMA_PROVIDER)).thenReturn(null);
     when(reducerContext.getConfiguration()).thenReturn(conf2);
-    AvroKeyDedupReducer reducer2 = new AvroKeyDedupReducer();
+    RecordKeyDedupReducerBase<AvroKey<GenericRecord>, AvroValue<GenericRecord>,
+        AvroKey<GenericRecord>, NullWritable> reducer2 = new 
AvroKeyDedupReducer();
     reducer2.setup(reducerContext);
     reducer2.reduce(key, valueIterable, reducerContext);
     Assert.assertEquals(reducer2.getOutKey().datum(), fullRecord1.datum());
 
     // Test reducer with compound delta key.
-    Schema fullSchema2 = new 
Schema.Parser().parse(FULL_SCHEMA_WITH_TWO_DELTA_FIELDS);
+    Schema fullSchema2 = new 
Schema.Parser().parse(AVRO_FULL_SCHEMA_WITH_TWO_DELTA_FIELDS);
     GenericRecordBuilder fullRecordBuilder2 = new 
GenericRecordBuilder(fullSchema2);
     fullRecordBuilder2.set("key", record);
     fullRecordBuilder2.set("scn", 123);
@@ -170,7 +175,5 @@ public class AvroKeyDedupReducerTest {
     reducer.reduce(key, valueIterable2, reducerContext);
     Assert.assertEquals(reducer.getOutKey().datum(), fullRecord3.datum());
 
-
-
   }
 }
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
new file mode 100644
index 0000000..5e86dd3
--- /dev/null
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.runtime.api.JobExecutionResult;
+import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.impl.ReaderImpl;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapreduce.OrcMapreduceRecordReader;
+import org.apache.orc.mapreduce.OrcMapreduceRecordWriter;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.apache.gobblin.compaction.mapreduce.AvroCompactionTaskTest.*;
+import static 
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*;
+
+
+public class OrcCompactionTaskTest {
+
+  @Test
+  public void basicTest() throws Exception {
+    File basePath = Files.createTempDir();
+    basePath.deleteOnExit();
+
+    String minutelyPath = 
"Identity/MemberAccount/minutely/2017/04/03/10/20_30/run_2017-04-03-10-20";
+    String hourlyPath = "Identity/MemberAccount/hourly/2017/04/03/10/";
+    File jobDir = new File(basePath, minutelyPath);
+    Assert.assertTrue(jobDir.mkdirs());
+
+    // Write some ORC file for compaction here.
+    TypeDescription schema = TypeDescription.fromString("struct<i:int,j:int>");
+    OrcStruct orcStruct_0 = (OrcStruct) OrcStruct.createValue(schema);
+    orcStruct_0.setFieldValue("i", new IntWritable(1));
+    orcStruct_0.setFieldValue("j", new IntWritable(2));
+
+    OrcStruct orcStruct_1 = (OrcStruct) OrcStruct.createValue(schema);
+    orcStruct_1.setFieldValue("i", new IntWritable(1));
+    orcStruct_1.setFieldValue("j", new IntWritable(2));
+
+    File file_0 = new File(jobDir, "file_0");
+    File file_1 = new File(jobDir, "file_1");
+    writeOrcRecordsInFile(new Path(file_0.getAbsolutePath()), schema, 
ImmutableList.of(orcStruct_0));
+    writeOrcRecordsInFile(new Path(file_1.getAbsolutePath()), schema, 
ImmutableList.of(orcStruct_1));
+
+    // Verify execution
+
+    // Overwrite the job configurator factory key.
+    String extensionFileName = "orcavro";
+    EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin("basic", 
basePath.getAbsolutePath().toString())
+        
.setConfiguration(CompactionJobConfigurator.COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
+        TestCompactionOrcJobConfigurator.Factory.class.getName())
+        .setConfiguration(COMPACTION_OUTPUT_EXTENSION, extensionFileName);
+    JobExecutionResult execution = embeddedGobblin.run();
+    Assert.assertTrue(execution.isSuccessful());
+
+    // Result verification
+    File outputDir = new File(basePath, hourlyPath);
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    List<FileStatus> statuses = new ArrayList<>();
+    for (FileStatus status : fs.listStatus(new 
Path(outputDir.getAbsolutePath()), new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return FilenameUtils.isExtension(path.getName(), extensionFileName);
+      }
+    })) {
+      statuses.add(status);
+    }
+
+    Assert.assertTrue(statuses.size() == 1);
+    List<OrcStruct> result = readOrcFile(statuses.get(0).getPath());
+    Assert.assertEquals(result.size(), 1);
+    Assert.assertEquals(result.get(0).getFieldValue("i"), new IntWritable(1));
+    Assert.assertEquals(result.get(0).getFieldValue("j"), new IntWritable(2));
+  }
+
+  /**
+   * Read a output ORC compacted file into memory.
+   */
+  public List<OrcStruct> readOrcFile(Path orcFilePath)
+      throws IOException, InterruptedException {
+    ReaderImpl orcReader = new ReaderImpl(orcFilePath, new 
OrcFile.ReaderOptions(new Configuration()));
+
+    Reader.Options options = new 
Reader.Options().schema(orcReader.getSchema());
+    OrcMapreduceRecordReader recordReader = new 
OrcMapreduceRecordReader(orcReader, options);
+    List<OrcStruct> result = new ArrayList<>();
+
+    while (recordReader.nextKeyValue()) {
+      result.add((OrcStruct) recordReader.getCurrentValue());
+    }
+
+    return result;
+  }
+
+  public void writeOrcRecordsInFile(Path path, TypeDescription schema, 
List<OrcStruct> orcStructs) throws Exception {
+    Configuration configuration = new Configuration();
+    OrcFile.WriterOptions options = 
OrcFile.writerOptions(configuration).setSchema(schema);
+
+    Writer writer = OrcFile.createWriter(path, options);
+    OrcMapreduceRecordWriter recordWriter = new 
OrcMapreduceRecordWriter(writer);
+    for (OrcStruct orcRecord : orcStructs) {
+      recordWriter.write(NullWritable.get(), orcRecord);
+    }
+    recordWriter.close(new TaskAttemptContextImpl(configuration, new 
TaskAttemptID()));
+  }
+
+  private static class TestCompactionOrcJobConfigurator extends 
CompactionOrcJobConfigurator {
+    public static class Factory implements 
CompactionJobConfigurator.ConfiguratorFactory {
+      @Override
+      public TestCompactionOrcJobConfigurator createConfigurator(State state) 
throws IOException {
+        return new TestCompactionOrcJobConfigurator(state);
+      }
+    }
+
+    @Override
+    protected void setNumberOfReducers(Job job) throws IOException {
+      job.setNumReduceTasks(1);
+    }
+
+    public TestCompactionOrcJobConfigurator(State state) throws IOException {
+      super(state);
+    }
+  }
+}
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyComparatorTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyComparatorTest.java
new file mode 100644
index 0000000..d26dc6f
--- /dev/null
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyComparatorTest.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.orc;
+
+import java.util.TreeMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcList;
+import org.apache.orc.mapred.OrcMap;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcUnion;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Mainly to test {@link OrcKeyComparator} is behaving as expected when it is 
comparing two {@link OrcStruct}.
+ * It covers basic(primitive) type of {@link OrcStruct} and those contain 
complex type (MAP, LIST, UNION, Struct)
+ *
+ * Reference: https://orc.apache.org/docs/types.html
+ */
+public class OrcKeyComparatorTest {
+  @Test
+  public void testSimpleComparator() throws Exception {
+    OrcKeyComparator comparator = new OrcKeyComparator();
+    Configuration conf = new Configuration();
+    String orcSchema = "struct<i:int,j:int>";
+    TypeDescription schema = TypeDescription.fromString(orcSchema);
+    conf.set(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute(), orcSchema);
+    
Assert.assertEquals(conf.get(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute()), 
orcSchema);
+    comparator.setConf(conf);
+
+    OrcStruct record0 = createSimpleOrcStruct(schema, 1, 2);
+    OrcStruct record1 = createSimpleOrcStruct(schema, 3, 4);
+    OrcStruct record2 = createSimpleOrcStruct(schema, 3, 4);
+
+    OrcKey orcKey0 = new OrcKey();
+    orcKey0.key = record0;
+    OrcKey orcKey1 = new OrcKey();
+    orcKey1.key = record1;
+    OrcKey orcKey2 = new OrcKey();
+    orcKey2.key = record2;
+
+    Assert.assertTrue(comparator.compare(orcKey0, orcKey1) < 0);
+    Assert.assertTrue(comparator.compare(orcKey1, orcKey2) == 0);
+    Assert.assertTrue(comparator.compare(orcKey1, orcKey0) > 0);
+  }
+
+  @Test
+  public void testComplexRecordArray() throws Exception {
+    OrcKeyComparator comparator = new OrcKeyComparator();
+    Configuration conf = new Configuration();
+
+    TypeDescription listSchema = 
TypeDescription.createList(TypeDescription.createString());
+    TypeDescription schema =
+        TypeDescription.createStruct().addField("a", 
TypeDescription.createInt()).addField("b", listSchema);
+
+    conf.set(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute(), 
schema.toString());
+    
Assert.assertEquals(conf.get(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute()), 
schema.toString());
+    comparator.setConf(conf);
+
+    // base record
+    OrcStruct record0 = (OrcStruct) OrcStruct.createValue(schema);
+    record0.setFieldValue("a", new IntWritable(1));
+    OrcList orcList0 = createOrcList(3, listSchema, 3);
+    record0.setFieldValue("b", orcList0);
+
+    // the same as base but different object, expecting equal to each other.
+    OrcStruct record1 = (OrcStruct) OrcStruct.createValue(schema);
+    record1.setFieldValue("a", new IntWritable(1));
+    OrcList orcList1 = createOrcList(3, listSchema, 3);
+    record1.setFieldValue("b", orcList1);
+
+    // Diff in int field
+    OrcStruct record2 = (OrcStruct) OrcStruct.createValue(schema);
+    record2.setFieldValue("a", new IntWritable(2));
+    OrcList orcList2 = createOrcList(3, listSchema, 3);
+    record2.setFieldValue("b", orcList2);
+
+    // Diff in array field: 1
+    OrcStruct record3 = (OrcStruct) OrcStruct.createValue(schema);
+    record3.setFieldValue("a", new IntWritable(1));
+    OrcList orcList3 = createOrcList(3, listSchema, 5);
+    record3.setFieldValue("b", orcList3);
+
+    // Diff in array field: 2
+    OrcStruct record4 = (OrcStruct) OrcStruct.createValue(schema);
+    record4.setFieldValue("a", new IntWritable(1));
+    OrcList orcList4 = createOrcList(4, listSchema, 3);
+    record4.setFieldValue("b", orcList4);
+
+    OrcKey orcKey0 = new OrcKey();
+    orcKey0.key = record0;
+    OrcKey orcKey1 = new OrcKey();
+    orcKey1.key = record1;
+    OrcKey orcKey2 = new OrcKey();
+    orcKey2.key = record2;
+    OrcKey orcKey3 = new OrcKey();
+    orcKey3.key = record3;
+    OrcKey orcKey4 = new OrcKey();
+    orcKey4.key = record4;
+
+    Assert.assertTrue(comparator.compare(orcKey0, orcKey1) == 0);
+    Assert.assertTrue(comparator.compare(orcKey1, orcKey2) < 0);
+    Assert.assertTrue(comparator.compare(orcKey1, orcKey3) < 0);
+    Assert.assertTrue(comparator.compare(orcKey1, orcKey4) < 0);
+  }
+
+  @Test
+  public void testComplexRecordMap() throws Exception {
+    OrcKeyComparator comparator = new OrcKeyComparator();
+    Configuration conf = new Configuration();
+    TypeDescription mapFieldSchema =
+        TypeDescription.createMap(TypeDescription.createString(), 
TypeDescription.createString());
+    TypeDescription schema =
+        TypeDescription.createStruct().addField("a", 
TypeDescription.createInt()).addField("b", mapFieldSchema);
+
+    conf.set(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute(), 
schema.toString());
+    
Assert.assertEquals(conf.get(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute()), 
schema.toString());
+    comparator.setConf(conf);
+
+    // base record
+    OrcStruct record0 = (OrcStruct) OrcStruct.createValue(schema);
+    record0.setFieldValue("a", new IntWritable(1));
+    OrcMap orcMap = createSimpleOrcMap(new Text("key"), new Text("value"), 
mapFieldSchema);
+    record0.setFieldValue("b", orcMap);
+
+    // key value both differ
+    OrcStruct record1 = (OrcStruct) OrcStruct.createValue(schema);
+    record1.setFieldValue("a", new IntWritable(1));
+    OrcMap orcMap1 = createSimpleOrcMap(new Text("key_key"), new 
Text("value_value"), mapFieldSchema);
+    record1.setFieldValue("b", orcMap1);
+
+    // Key same, value differ
+    OrcStruct record2 = (OrcStruct) OrcStruct.createValue(schema);
+    record2.setFieldValue("a", new IntWritable(1));
+    OrcMap orcMap2 = createSimpleOrcMap(new Text("key"), new 
Text("value_value"), mapFieldSchema);
+    record2.setFieldValue("b", orcMap2);
+
+    // Same as base
+    OrcStruct record3 = (OrcStruct) OrcStruct.createValue(schema);
+    record3.setFieldValue("a", new IntWritable(1));
+    OrcMap orcMap3 = createSimpleOrcMap(new Text("key"), new Text("value"), 
mapFieldSchema);
+    record3.setFieldValue("b", orcMap3);
+
+    // Differ in other field.
+    OrcStruct record4 = (OrcStruct) OrcStruct.createValue(schema);
+    record4.setFieldValue("a", new IntWritable(2));
+    record4.setFieldValue("b", orcMap);
+
+    // Record with map containing multiple entries but inserted in different 
order.
+    OrcStruct record6 = (OrcStruct) OrcStruct.createValue(schema);
+    record6.setFieldValue("a", new IntWritable(1));
+    OrcMap orcMap6 = createSimpleOrcMap(new Text("key"), new Text("value"), 
mapFieldSchema);
+    orcMap6.put(new Text("keyLater"), new Text("valueLater"));
+    record6.setFieldValue("b", orcMap6);
+
+    OrcStruct record7 = (OrcStruct) OrcStruct.createValue(schema);
+    record7.setFieldValue("a", new IntWritable(1));
+    OrcMap orcMap7 = createSimpleOrcMap(new Text("keyLater"), new 
Text("valueLater"), mapFieldSchema);
+    orcMap7.put(new Text("key"), new Text("value"));
+    record7.setFieldValue("b", orcMap7);
+
+    OrcKey orcKey0 = new OrcKey();
+    orcKey0.key = record0;
+    OrcKey orcKey1 = new OrcKey();
+    orcKey1.key = record1;
+    OrcKey orcKey2 = new OrcKey();
+    orcKey2.key = record2;
+    OrcKey orcKey3 = new OrcKey();
+    orcKey3.key = record3;
+    OrcKey orcKey4 = new OrcKey();
+    orcKey4.key = record4;
+
+    OrcKey orcKey6 = new OrcKey();
+    orcKey6.key = record6;
+    OrcKey orcKey7 = new OrcKey();
+    orcKey7.key = record7;
+
+    Assert.assertTrue(comparator.compare(orcKey0, orcKey1) < 0);
+    Assert.assertTrue(comparator.compare(orcKey1, orcKey2) > 0);
+    Assert.assertTrue(comparator.compare(orcKey2, orcKey3) > 0);
+    Assert.assertTrue(comparator.compare(orcKey0, orcKey3) == 0);
+    Assert.assertTrue(comparator.compare(orcKey0, orcKey4) < 0);
+    Assert.assertTrue(comparator.compare(orcKey6, orcKey7) == 0);
+  }
+
+  // Test comparison for union containing complex types and nested record 
inside.
+  // Schema: struct<a:int,
+  //                b:uniontype<int,
+  //                            array<string>,
+  //                            struct<x:int,y:int>
+  //                            >
+  //                >
+  @Test
+  public void testComplexRecordUnion() throws Exception {
+    OrcKeyComparator comparator = new OrcKeyComparator();
+    Configuration conf = new Configuration();
+
+    TypeDescription listSchema = 
TypeDescription.createList(TypeDescription.createString());
+
+    TypeDescription nestedRecordSchema = TypeDescription.createStruct()
+        .addField("x", TypeDescription.createInt())
+        .addField("y", TypeDescription.createInt());
+
+    TypeDescription unionSchema = TypeDescription.createUnion()
+        .addUnionChild(TypeDescription.createInt())
+        .addUnionChild(listSchema)
+        .addUnionChild(nestedRecordSchema);
+
+    TypeDescription schema =
+        TypeDescription.createStruct()
+            .addField("a", TypeDescription.createInt())
+            .addField("b", unionSchema);
+
+    conf.set(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute(), 
schema.toString());
+    
Assert.assertEquals(conf.get(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute()), 
schema.toString());
+    comparator.setConf(conf);
+
+    // base record
+    OrcStruct record0 = (OrcStruct) OrcStruct.createValue(schema);
+    record0.setFieldValue("a", new IntWritable(1));
+    OrcStruct nestedRecord0 = createSimpleOrcStruct(nestedRecordSchema, 1, 2);
+    OrcUnion orcUnion0 = createOrcUnion(unionSchema, nestedRecord0);
+    record0.setFieldValue("b", orcUnion0);
+
+    // same content as base record in diff objects.
+    OrcStruct record1 = (OrcStruct) OrcStruct.createValue(schema);
+    record1.setFieldValue("a", new IntWritable(1));
+    OrcStruct nestedRecord1 = createSimpleOrcStruct(nestedRecordSchema, 1, 2);
+    OrcUnion orcUnion1 = createOrcUnion(unionSchema, nestedRecord1);
+    record1.setFieldValue("b", orcUnion1);
+
+    // diff records inside union, record0 == record1 < 2
+    OrcStruct record2 = (OrcStruct) OrcStruct.createValue(schema);
+    record2.setFieldValue("a", new IntWritable(1));
+    OrcStruct nestedRecord2 = createSimpleOrcStruct(nestedRecordSchema, 2, 2);
+    OrcUnion orcUnion2 = createOrcUnion(unionSchema, nestedRecord2);
+    record2.setFieldValue("b", orcUnion2);
+
+
+    // differ in list inside union, record3 < record4 == record5
+    OrcStruct record3 = (OrcStruct) OrcStruct.createValue(schema);
+    record3.setFieldValue("a", new IntWritable(1));
+    OrcList orcList3 = createOrcList(5, listSchema, 2);
+    OrcUnion orcUnion3 = createOrcUnion(unionSchema, orcList3);
+    record3.setFieldValue("b", orcUnion3);
+
+    OrcStruct record4 = (OrcStruct) OrcStruct.createValue(schema);
+    record4.setFieldValue("a", new IntWritable(1));
+    OrcList orcList4 = createOrcList(6, listSchema, 2);
+    OrcUnion orcUnion4 = createOrcUnion(unionSchema, orcList4);
+    record4.setFieldValue("b", orcUnion4);
+
+    OrcStruct record5 = (OrcStruct) OrcStruct.createValue(schema);
+    record5.setFieldValue("a", new IntWritable(1));
+    OrcList orcList5 = createOrcList(6, listSchema, 2);
+    OrcUnion orcUnion5 = createOrcUnion(unionSchema, orcList5);
+    record5.setFieldValue("b", orcUnion5);
+
+
+    OrcKey orcKey0 = new OrcKey();
+    orcKey0.key = record0;
+    OrcKey orcKey1 = new OrcKey();
+    orcKey1.key = record1;
+    OrcKey orcKey2 = new OrcKey();
+    orcKey2.key = record2;
+    OrcKey orcKey3 = new OrcKey();
+    orcKey3.key = record3;
+    OrcKey orcKey4 = new OrcKey();
+    orcKey4.key = record4;
+    OrcKey orcKey5 = new OrcKey();
+    orcKey5.key = record5;
+
+    Assert.assertEquals(orcUnion0, orcUnion1);
+    // Int value in orcKey2 is larger
+    Assert.assertTrue(comparator.compare(orcKey0, orcKey2) < 0);
+    Assert.assertTrue(comparator.compare(orcKey3, orcKey4) < 0 );
+    Assert.assertTrue(comparator.compare(orcKey3, orcKey5) < 0);
+    Assert.assertTrue(comparator.compare(orcKey4, orcKey5) == 0);
+  }
+
+  private OrcMap createSimpleOrcMap(Text key, Text value, TypeDescription 
schema) {
+    TreeMap map = new TreeMap<Text, Text>();
+    map.put(key, value);
+    OrcMap result = new OrcMap(schema);
+    result.putAll(map);
+    return result;
+  }
+
+  /**
+   * Create a {@link OrcList} repeating the given parameter inside the list 
for multiple times.
+   */
+  private OrcList createOrcList(int element, TypeDescription schema, int num) {
+    OrcList result = new OrcList(schema);
+    for (int i = 0; i < num; i++) {
+      result.add(new IntWritable(element));
+    }
+    return result;
+  }
+
+  private OrcUnion createOrcUnion(TypeDescription schema, WritableComparable 
value) {
+    OrcUnion result = new OrcUnion(schema);
+    result.set(0, value);
+    return result;
+  }
+
+  private OrcStruct createSimpleOrcStruct(TypeDescription structSchema, int 
value1, int value2) {
+    OrcStruct result = new OrcStruct(structSchema);
+    result.setFieldValue(0, new IntWritable(value1));
+    result.setFieldValue(1, new IntWritable(value2));
+    return result;
+  }
+}
diff --git a/gobblin-modules/gobblin-orc-dep/build.gradle 
b/gobblin-modules/gobblin-orc-dep/build.gradle
new file mode 100644
index 0000000..705eaaa
--- /dev/null
+++ b/gobblin-modules/gobblin-orc-dep/build.gradle
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * The motivation of having separated dependency module for orc-mapreduce is 
due to
+ * standalone orc-core libaray depends on much higher version of hive which 
could conflict
+ * with very lower version of hive-exec-core used by gobblin.
+ */
+
+buildscript {
+    repositories {
+        jcenter()
+    }
+    dependencies {
+        classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
+    }
+}
+
+apply plugin: 'com.github.johnrengelman.shadow'
+apply plugin: 'java'
+tasks.remove(tasks.uploadShadow)
+
+dependencies {
+  compile externalDependency.orcMapreduce
+}
+
+configurations {
+  compile {
+    transitive = true
+  }
+  compile {
+    exclude group: "commons-cli"
+    exclude group: "org.apache.avro"
+    exclude group: "org.apache.httpcomponents"
+  }
+}
+
+shadowJar {
+  zip64 true
+  relocate 'org.apache.hadoop.hive', 
'shadow.gobblin.orc.org.apache.hadoop.hive'
+}
+
+ext.classification="library"
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProvider.java
 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProvider.java
index 995a913..375ecc1 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProvider.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProvider.java
@@ -38,14 +38,16 @@ public class CompactionRecordCountProvider extends 
RecordCountProvider {
   public static final String M_OUTPUT_FILE_PREFIX = "part-m-";
 
   private static final String SEPARATOR = ".";
-  private static final String SUFFIX = ".avro";
+  private static final String DEFAULT_SUFFIX = ".avro";
   private static final Random RANDOM = new Random();
 
   /**
    * Construct the file name as 
{filenamePrefix}{recordCount}.{SystemCurrentTimeInMills}.{RandomInteger}{SUFFIX}.
+   * @deprecated discouraged since default behavior is not obvious from API 
itself.
    */
+  @Deprecated
   public static String constructFileName(String filenamePrefix, long 
recordCount) {
-    return constructFileName(filenamePrefix, SUFFIX, recordCount);
+    return constructFileName(filenamePrefix, DEFAULT_SUFFIX, recordCount);
   }
 
   /**
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProviderTest.java
 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProviderTest.java
index d78fc79..90c5701 100644
--- 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProviderTest.java
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProviderTest.java
@@ -34,7 +34,7 @@ public class CompactionRecordCountProviderTest {
     CompactionRecordCountProvider filenameRecordCountProvider = new 
CompactionRecordCountProvider();
 
     Pattern pattern = 
Pattern.compile("part\\-r\\-123\\.[\\d]*\\.[\\d]*\\.avro");
-    
Assert.assertTrue(pattern.matcher(CompactionRecordCountProvider.constructFileName("part-r-",
 123)).matches());
+    
Assert.assertTrue(pattern.matcher(CompactionRecordCountProvider.constructFileName("part-r-",
 ".avro", 123)).matches());
     Assert.assertEquals(filenameRecordCountProvider.getRecordCount(new 
Path("part-r-123.1.2.avro")), 123);
   }
 }

Reply via email to