This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new c89bfcd  [GOBBLIN-691] Make format-specific component pluggable in 
compaction
c89bfcd is described below

commit c89bfcdf75a2cc0c3ca93be07cd108eb6e3c4cb5
Author: Lei Sun <[email protected]>
AuthorDate: Tue Mar 5 11:16:25 2019 -0800

    [GOBBLIN-691] Make format-specific component pluggable in compaction
    
    Closes #2563 from autumnust/orcCompaction
---
 .../gobblin/configuration/ConfigurationKeys.java   |   2 +-
 .../gobblin/compaction/CompactorFactory.java       |   5 +
 .../compaction/ReflectionCompactorFactory.java     |   6 +
 .../CompactionCompleteFileOperationAction.java     |  27 +-
 .../action/CompactionMarkDirectoryAction.java      |   5 +-
 .../mapreduce/CompactionAvroJobConfigurator.java   | 336 +++------------------
 ...gurator.java => CompactionJobConfigurator.java} | 327 +++++++++-----------
 .../compaction/mapreduce/MRCompactionRunner.java   |   5 +
 .../gobblin/compaction/mapreduce/MRCompactor.java  |   5 +
 .../mapreduce/MRCompactorJobPropCreator.java       |   4 +
 .../compaction/mapreduce/MRCompactorJobRunner.java |   8 +-
 .../avro/MRCompactorAvroKeyDedupJobRunner.java     |  20 +-
 .../compaction/source/CompactionSource.java        |   8 +-
 .../gobblin/compaction/suite/CompactionSuite.java  |   4 +
 ...tionAvroSuite.java => CompactionSuiteBase.java} |  24 +-
 ...actory.java => CompactionSuiteBaseFactory.java} |  10 +-
 .../compaction/suite/CompactionSuiteUtils.java     |  11 +-
 .../verify/CompactionThresholdVerifier.java        |   2 +-
 .../verify/CompactionTimeRangeVerifier.java        |  21 +-
 .../suite/TestCompactionSuiteFactories.java        |   2 +-
 .../compaction/suite/TestCompactionSuites.java     |   2 +-
 .../gobblin/runtime/GobblinMultiTaskAttempt.java   |   2 +-
 .../gobblin/runtime/TaskStateCollectorService.java |   4 +-
 23 files changed, 280 insertions(+), 560 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index c0feda1..9004cff 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -902,7 +902,7 @@ public class ConfigurationKeys {
    */
   public static final String COMPACTION_PREFIX = "compaction.";
   public static final String COMPACTION_SUITE_FACTORY = COMPACTION_PREFIX + 
"suite.factory";
-  public static final String DEFAULT_COMPACTION_SUITE_FACTORY = 
"CompactionAvroSuiteFactory";
+  public static final String DEFAULT_COMPACTION_SUITE_FACTORY = 
"CompactionSuiteBaseFactory";
 
   public static final String COMPACTION_PRIORITIZATION_PREFIX = 
COMPACTION_PREFIX + "prioritization.";
   public static final String COMPACTION_PRIORITIZER_ALIAS = 
COMPACTION_PRIORITIZATION_PREFIX + "prioritizerAlias";
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/CompactorFactory.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/CompactorFactory.java
index cb7e7c0..705ed84 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/CompactorFactory.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/CompactorFactory.java
@@ -29,8 +29,13 @@ import org.apache.gobblin.metrics.Tag;
 
 /**
  * A factory responsible for creating {@link Compactor}s.
+ * @deprecated Please use {@link 
org.apache.gobblin.compaction.mapreduce.MRCompactionTask}
+ *  * and {@link org.apache.gobblin.compaction.source.CompactionSource} to 
launch MR instead.
+ *  * The new way enjoys simpler logic to trigger the compaction flow and more 
reliable verification criteria,
+ *  * instead of using timestamp only before.
  */
 @Alpha
+@Deprecated
 public interface CompactorFactory {
 
     /**
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java
index 1ba6fac..2e475fc 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java
@@ -31,7 +31,13 @@ import org.apache.gobblin.metrics.Tag;
 
 /**
  * Implementation of {@link CompactorFactory} that creates a {@link Compactor} 
using reflection.
+ *
+ * @deprecated Please use {@link 
org.apache.gobblin.compaction.mapreduce.MRCompactionTask}
+ * and {@link org.apache.gobblin.compaction.source.CompactionSource} to launch 
MR instead.
+ * The new way enjoys simpler logic to trigger the compaction flow and more 
reliable verification criteria,
+ * instead of using timestamp only before.
  */
+@Deprecated
 public class ReflectionCompactorFactory implements CompactorFactory {
 
   @VisibleForTesting
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index 831443b..8dc6324 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -17,23 +17,15 @@
 
 package org.apache.gobblin.compaction.action;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Job;
-
-import com.google.common.collect.ImmutableMap;
-
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
-import org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator;
+import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner;
 import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyMapper;
@@ -45,22 +37,28 @@ import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.WriterUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
 
 
 /**
  * A type of post action {@link CompactionCompleteAction} which focus on the 
file operations
+ *
  */
 @Slf4j
 @AllArgsConstructor
 public class CompactionCompleteFileOperationAction implements 
CompactionCompleteAction<FileSystemDataset> {
 
   protected WorkUnitState state;
-  private CompactionAvroJobConfigurator configurator;
+  private CompactionJobConfigurator configurator;
   private InputRecordCountHelper helper;
   private EventSubmitter eventSubmitter;
   private FileSystem fs;
 
-  public CompactionCompleteFileOperationAction (State state, 
CompactionAvroJobConfigurator configurator) {
+  public CompactionCompleteFileOperationAction (State state, 
CompactionJobConfigurator configurator) {
     if (!(state instanceof WorkUnitState)) {
       throw new UnsupportedOperationException(this.getClass().getName() + " 
only supports workunit state");
     }
@@ -90,7 +88,8 @@ public class CompactionCompleteFileOperationAction implements 
CompactionComplete
       long oldTotalRecords = helper.readRecordCount(new Path 
(result.getDstAbsoluteDir()));
       long executeCount = helper.readExecutionCount (new Path 
(result.getDstAbsoluteDir()));
 
-      List<Path> goodPaths = CompactionAvroJobConfigurator.getGoodFiles(job, 
tmpPath, this.fs);
+      List<Path> goodPaths = CompactionJobConfigurator.getGoodFiles(job, 
tmpPath, this.fs,
+          ImmutableList.of(configurator.getFileExtension()));
 
       if (appendDeltaOutput) {
         FsPermission permission = 
HadoopUtils.deserializeFsPermission(this.state,
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
index 89be94a..ac1f1d7 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
 
+import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -43,10 +44,10 @@ import org.apache.gobblin.metrics.event.EventSubmitter;
 @AllArgsConstructor
 public class CompactionMarkDirectoryAction implements 
CompactionCompleteAction<FileSystemDataset> {
   protected State state;
-  private CompactionAvroJobConfigurator configurator;
+  private CompactionJobConfigurator configurator;
   private FileSystem fs;
   private EventSubmitter eventSubmitter;
-  public CompactionMarkDirectoryAction(State state, 
CompactionAvroJobConfigurator configurator) {
+  public CompactionMarkDirectoryAction(State state, CompactionJobConfigurator 
configurator) {
     if (!(state instanceof WorkUnitState)) {
       throw new UnsupportedOperationException(this.getClass().getName() + " 
only supports workunit state");
     }
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
index 1779e33..14d23d5 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
@@ -17,110 +17,78 @@
 
 package org.apache.gobblin.compaction.mapreduce;
 
+import com.google.common.base.Enums;
+import com.google.common.base.Optional;
 import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
+import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroValue;
 import org.apache.avro.mapreduce.AvroJob;
-import org.apache.commons.math3.primes.Primes;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskCompletionEvent;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-import com.google.common.base.Enums;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.primitives.Ints;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.compaction.dataset.DatasetHelper;
 import 
org.apache.gobblin.compaction.mapreduce.avro.AvroKeyCompactorOutputFormat;
 import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyDedupReducer;
 import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyMapper;
 import 
org.apache.gobblin.compaction.mapreduce.avro.AvroKeyRecursiveCombineFileInputFormat;
 import 
org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
-import org.apache.gobblin.compaction.parser.CompactionPathParser;
-import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
-import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.compaction.suite.CompactionSuiteBase;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.Dataset;
 import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
 import org.apache.gobblin.util.AvroUtils;
-import org.apache.gobblin.util.FileListUtils;
 import org.apache.gobblin.util.HadoopUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+
 
 /**
  * A configurator that focused on creating avro compaction map-reduce job
  */
 @Slf4j
-public class CompactionAvroJobConfigurator {
-  protected final State state;
+public class CompactionAvroJobConfigurator extends CompactionJobConfigurator {
 
-  @Getter
-  protected final FileSystem fs;
+  public static class Factory implements 
CompactionJobConfigurator.ConfiguratorFactory {
+    @Override
+    public CompactionJobConfigurator createConfigurator(State state) throws 
IOException {
+      return new CompactionAvroJobConfigurator(state);
+    }
+  }
 
-  // Below attributes are MR related
-  @Getter
-  protected Job configuredJob;
-  @Getter
-  protected final boolean shouldDeduplicate;
-  @Getter
-  protected Path mrOutputPath = null;
-  @Getter
-  protected boolean isJobCreated = false;
-  @Getter
-  protected Collection<Path> mapReduceInputPaths = null;
-  @Getter
-  private long fileNameRecordCount = 0;
+  @Override
+  public String getFileExtension(){
+    return EXTENSION.AVRO.getExtensionString();
+  }
 
   /**
    * Constructor
    * @param  state  A task level state
    */
   public CompactionAvroJobConfigurator(State state) throws IOException {
-    this.state = state;
-    this.fs = getFileSystem(state);
-    this.shouldDeduplicate = 
state.getPropAsBoolean(MRCompactor.COMPACTION_SHOULD_DEDUPLICATE, true);
+    super(state);
   }
 
   /**
-   * Refer to {@link MRCompactorAvroKeyDedupJobRunner#getDedupKeyOption()}
+   * Refer to MRCompactorAvroKeyDedupJobRunner#getDedupKeyOption()
    */
   private MRCompactorAvroKeyDedupJobRunner.DedupKeyOption getDedupKeyOption() {
     if 
(!this.state.contains(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_DEDUP_KEY))
 {
       return MRCompactorAvroKeyDedupJobRunner.DEFAULT_DEDUP_KEY_OPTION;
     }
-    Optional<MRCompactorAvroKeyDedupJobRunner.DedupKeyOption> option = 
Enums.getIfPresent(MRCompactorAvroKeyDedupJobRunner.DedupKeyOption.class,
+    Optional<MRCompactorAvroKeyDedupJobRunner.DedupKeyOption> option =
+        
Enums.getIfPresent(MRCompactorAvroKeyDedupJobRunner.DedupKeyOption.class,
             
this.state.getProp(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_DEDUP_KEY).toUpperCase());
     return option.isPresent() ? option.get() : 
MRCompactorAvroKeyDedupJobRunner.DEFAULT_DEDUP_KEY_OPTION;
   }
 
   /**
-   * Refer to {@link MRCompactorAvroKeyDedupJobRunner#getKeySchema(Job, 
Schema)}
+   * Refer to MRCompactorAvroKeyDedupJobRunner#getKeySchema(Job, Schema)
    */
   private Schema getKeySchema(Job job, Schema topicSchema) throws IOException {
 
-    boolean keySchemaFileSpecified = 
this.state.contains(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC);
+    boolean keySchemaFileSpecified =
+        
this.state.contains(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC);
 
     Schema keySchema = null;
 
@@ -138,25 +106,28 @@ public class CompactionAvroJobConfigurator {
         keySchema = AvroUtils.parseSchemaFromFile(keySchemaFile, this.fs);
       } catch (IOException e) {
         log.error("Failed to parse avro schema from " + keySchemaFile
-                + ", using key attributes in the schema for compaction");
-        keySchema = 
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
+            + ", using key attributes in the schema for compaction");
+        keySchema =
+            
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
       }
 
       if (!MRCompactorAvroKeyDedupJobRunner.isKeySchemaValid(keySchema, 
topicSchema)) {
         log.warn(String.format("Key schema %s is not compatible with record 
schema %s.", keySchema, topicSchema)
-                + "Using key attributes in the schema for compaction");
-        keySchema = 
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
+            + "Using key attributes in the schema for compaction");
+        keySchema =
+            
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
       }
     } else {
       log.info("Property " + 
MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC
-              + " not provided. Using key attributes in the schema for 
compaction");
+          + " not provided. Using key attributes in the schema for 
compaction");
       keySchema = 
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
     }
 
     return keySchema;
   }
 
-  private void configureSchema(Job job) throws IOException {
+  @Override
+  protected void configureSchema(Job job) throws IOException {
     Schema newestSchema = 
MRCompactorAvroKeyDedupJobRunner.getNewestSchemaFromSource(job, this.fs);
     if (newestSchema != null) {
       if 
(this.state.getPropAsBoolean(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_SINGLE_INPUT_SCHEMA,
 true)) {
@@ -182,240 +153,5 @@ public class CompactionAvroJobConfigurator {
     job.setOutputValueClass(NullWritable.class);
     setNumberOfReducers(job);
   }
-
-  /**
-   * Refer to {@link MRCompactorAvroKeyDedupJobRunner#setNumberOfReducers(Job)}
-   */
-  protected void setNumberOfReducers(Job job) throws IOException {
-
-    // get input size
-    long inputSize = 0;
-    for (Path inputPath : this.mapReduceInputPaths) {
-      inputSize += this.fs.getContentSummary(inputPath).getLength();
-    }
-
-    // get target file size
-    long targetFileSize = 
this.state.getPropAsLong(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_TARGET_OUTPUT_FILE_SIZE,
-            
MRCompactorAvroKeyDedupJobRunner.DEFAULT_COMPACTION_JOB_TARGET_OUTPUT_FILE_SIZE);
-
-    // get max reducers
-    int maxNumReducers = 
state.getPropAsInt(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_MAX_NUM_REDUCERS,
-            
MRCompactorAvroKeyDedupJobRunner.DEFAULT_COMPACTION_JOB_MAX_NUM_REDUCERS);
-
-    int numReducers = Math.min(Ints.checkedCast(inputSize / targetFileSize) + 
1, maxNumReducers);
-
-    // get use prime reducers
-    boolean usePrimeReducers = 
state.getPropAsBoolean(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_USE_PRIME_REDUCERS,
-            
MRCompactorAvroKeyDedupJobRunner.DEFAULT_COMPACTION_JOB_USE_PRIME_REDUCERS);
-
-    if (usePrimeReducers && numReducers != 1) {
-      numReducers = Primes.nextPrime(numReducers);
-    }
-    job.setNumReduceTasks(numReducers);
-  }
-
-  /**
-   * Concatenate multiple directory or file names into one path
-   *
-   * @return Concatenated path or null if the parameter is empty
-   */
-  private Path concatPaths (String ...names) {
-    if (names == null || names.length == 0) {
-      return null;
-    }
-    Path cur = new Path(names[0]);
-    for (int i = 1; i < names.length; ++i) {
-      cur = new Path(cur, new Path(names[i]));
-    }
-    return  cur;
-  }
-
-  /**
-   * Refer to {@link 
MRCompactorAvroKeyDedupJobRunner#configureInputAndOutputPaths(Job)}.
-   * @return false if no valid input paths present for MR job to process,  
where a path is valid if it is
-   * a directory containing one or more files.
-   *
-   */
-  protected boolean configureInputAndOutputPaths(Job job, FileSystemDataset 
dataset) throws IOException {
-    boolean emptyDirectoryFlag = false;
-
-    String mrOutputBase = this.state.getProp(MRCompactor.COMPACTION_JOB_DIR);
-    CompactionPathParser parser = new CompactionPathParser(this.state);
-    CompactionPathParser.CompactionParserResult rst = parser.parse(dataset);
-    this.mrOutputPath = concatPaths (mrOutputBase, rst.getDatasetName(), 
rst.getDstSubDir(), rst.getTimeString());
-
-    log.info ("Cleaning temporary MR output directory: " + mrOutputPath);
-    this.fs.delete(mrOutputPath, true);
-
-    this.mapReduceInputPaths = getGranularInputPaths(dataset.datasetRoot());
-    if (this.mapReduceInputPaths.isEmpty()) {
-      this.mapReduceInputPaths.add(dataset.datasetRoot());
-      emptyDirectoryFlag = true;
-    }
-
-    for (Path path: mapReduceInputPaths) {
-      FileInputFormat.addInputPath(job, path);
-    }
-
-    FileOutputFormat.setOutputPath(job, mrOutputPath);
-    return emptyDirectoryFlag;
-  }
-
-  /**
-   * Customized MR job creation. This method will be used in
-   * {@link 
org.apache.gobblin.compaction.suite.CompactionAvroSuite#createJob(Dataset)}
-   *
-   * @param  dataset  A path or directory which needs compaction
-   * @return A configured map-reduce job for avro compaction
-   */
-  public Job createJob(FileSystemDataset dataset) throws IOException {
-    Configuration conf = HadoopUtils.getConfFromState(state);
-
-    // Turn on mapreduce output compression by default
-    if (conf.get("mapreduce.output.fileoutputformat.compress") == null && 
conf.get("mapred.output.compress") == null) {
-      conf.setBoolean("mapreduce.output.fileoutputformat.compress", true);
-    }
-
-    // Disable delegation token cancellation by default
-    if (conf.get("mapreduce.job.complete.cancel.delegation.tokens") == null) {
-      conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", 
false);
-    }
-
-    addJars(conf);
-    Job job = Job.getInstance(conf);
-    job.setJobName(MRCompactorJobRunner.HADOOP_JOB_NAME);
-    boolean emptyDirectoryFlag = this.configureInputAndOutputPaths(job, 
dataset);
-    if (emptyDirectoryFlag) {
-      
this.state.setProp(HiveRegistrationPolicy.MAPREDUCE_JOB_INPUT_PATH_EMPTY_KEY, 
true);
-    }
-    this.configureMapper(job);
-    this.configureReducer(job);
-    if (emptyDirectoryFlag || !this.shouldDeduplicate) {
-      job.setNumReduceTasks(0);
-    }
-    // Configure schema at the last step because FilesInputFormat will be used 
internally
-    this.configureSchema(job);
-    this.isJobCreated = true;
-    this.configuredJob = job;
-    return job;
-  }
-
-  private void addJars(Configuration conf) throws IOException {
-    if (!state.contains(MRCompactor.COMPACTION_JARS)) {
-      return;
-    }
-    Path jarFileDir = new Path(state.getProp(MRCompactor.COMPACTION_JARS));
-    for (FileStatus status : this.fs.listStatus(jarFileDir)) {
-      DistributedCache.addFileToClassPath(status.getPath(), conf, this.fs);
-    }
-  }
-
-  private FileSystem getFileSystem(State state)
-          throws IOException {
-    Configuration conf = HadoopUtils.getConfFromState(state);
-    String uri = state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, 
ConfigurationKeys.LOCAL_FS_URI);
-    FileSystem fs = FileSystem.get(URI.create(uri), conf);
-
-    return fs;
-  }
-
-  /**
-   * Converts a top level input path to a group of sub-paths according to user 
defined granularity.
-   * This may be required because if upstream application generates many 
sub-paths but the map-reduce
-   * job only keeps track of the top level path, after the job is done, we 
won't be able to tell if
-   * those new arriving sub-paths is processed by previous map-reduce job or 
not. Hence a better way
-   * is to pre-define those sub-paths as input paths before we start to run 
MR. The implementation of
-   * this method should depend on the data generation granularity controlled 
by upstream. Here we just
-   * list the deepest level of containing folder as the smallest granularity.
-   *
-   * @param path top level directory needs compaction
-   * @return A collection of input paths which will participate in map-reduce 
job
-   */
-  protected Collection<Path> getGranularInputPaths (Path path) throws 
IOException {
-
-    boolean appendDelta = 
this.state.getPropAsBoolean(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED,
-            MRCompactor.DEFAULT_COMPACTION_RENAME_SOURCE_DIR_ENABLED);
-
-    Set<Path> uncompacted = Sets.newHashSet();
-    Set<Path> total = Sets.newHashSet();
-
-    for (FileStatus fileStatus : FileListUtils.listFilesRecursively(fs, path)) 
{
-      if (appendDelta) {
-        // use source dir suffix to identify the delta input paths
-        if 
(!fileStatus.getPath().getParent().toString().endsWith(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_SUFFIX))
 {
-          uncompacted.add(fileStatus.getPath().getParent());
-        }
-        total.add(fileStatus.getPath().getParent());
-      } else {
-          uncompacted.add(fileStatus.getPath().getParent());
-      }
-    }
-
-    if (appendDelta) {
-      // When the output record count from mr counter doesn't match
-      // the record count from input file names, we prefer file names because
-      // it will be used to calculate the difference of count in next run.
-      this.fileNameRecordCount = new 
InputRecordCountHelper(this.state).calculateRecordCount (total);
-      log.info ("{} has total input record count (based on file name) {}", 
path, this.fileNameRecordCount);
-    }
-
-    return uncompacted;
-  }
-
-  private static List<TaskCompletionEvent> getAllTaskCompletionEvent(Job 
completedJob) {
-    List<TaskCompletionEvent> completionEvents = new LinkedList<>();
-
-    while (true) {
-      try {
-        TaskCompletionEvent[] bunchOfEvents;
-        bunchOfEvents = 
completedJob.getTaskCompletionEvents(completionEvents.size());
-        if (bunchOfEvents == null || bunchOfEvents.length == 0) {
-          break;
-        }
-        completionEvents.addAll(Arrays.asList(bunchOfEvents));
-      } catch (IOException e) {
-        break;
-      }
-    }
-
-    return completionEvents;
-  }
-
-  private static List<TaskCompletionEvent> 
getUnsuccessfulTaskCompletionEvent(Job completedJob) {
-    return 
getAllTaskCompletionEvent(completedJob).stream().filter(te->te.getStatus() != 
TaskCompletionEvent.Status.SUCCEEDED).collect(
-        Collectors.toList());
-  }
-
-  private static boolean isFailedPath(Path path, List<TaskCompletionEvent> 
failedEvents) {
-    return path.toString().contains("_temporary") || failedEvents.stream()
-        .anyMatch(event -> path.toString().contains(Path.SEPARATOR + 
event.getTaskAttemptId().toString() + Path.SEPARATOR));
-  }
-
-  /**
-   * Get good files
-   * The problem happens when speculative task attempt initialized but then 
killed in the middle of processing.
-   * Some partial file was generated at 
{tmp_output}/_temporary/1/_temporary/attempt_xxx_xxx/part-m-xxxx.avro,
-   * without being committed to its final destination at 
{tmp_output}/part-m-xxxx.avro.
-   *
-   * @param job Completed MR job
-   * @param fs File system that can handle file system
-   * @return all successful files that has been committed
-   */
-  public static List<Path> getGoodFiles(Job job, Path tmpPath, FileSystem fs) 
throws IOException {
-    List<TaskCompletionEvent> failedEvents = 
CompactionAvroJobConfigurator.getUnsuccessfulTaskCompletionEvent(job);
-
-    List<Path> allFilePaths = DatasetHelper.getApplicableFilePaths(fs, 
tmpPath, Lists.newArrayList("avro"));
-    List<Path> goodPaths = new ArrayList<>();
-    for (Path filePath: allFilePaths) {
-      if (CompactionAvroJobConfigurator.isFailedPath(filePath, failedEvents)) {
-        fs.delete(filePath, false);
-        log.error("{} is a bad path so it was deleted", filePath);
-      } else {
-        goodPaths.add(filePath);
-      }
-    }
-
-    return goodPaths;
-  }
 }
 
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
similarity index 60%
copy from 
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
copy to 
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
index 1779e33..d88ad95 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
@@ -17,6 +17,10 @@
 
 package org.apache.gobblin.compaction.mapreduce;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -26,54 +30,52 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
-
-import org.apache.avro.Schema;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.commons.math3.primes.Primes;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskCompletionEvent;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-import com.google.common.base.Enums;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.primitives.Ints;
-
+import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-
+import org.apache.commons.math3.primes.Primes;
 import org.apache.gobblin.compaction.dataset.DatasetHelper;
-import 
org.apache.gobblin.compaction.mapreduce.avro.AvroKeyCompactorOutputFormat;
-import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyDedupReducer;
-import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyMapper;
-import 
org.apache.gobblin.compaction.mapreduce.avro.AvroKeyRecursiveCombineFileInputFormat;
 import 
org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
 import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.dataset.Dataset;
 import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
-import org.apache.gobblin.util.AvroUtils;
 import org.apache.gobblin.util.FileListUtils;
 import org.apache.gobblin.util.HadoopUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
 
 /**
- * A configurator that focused on creating avro compaction map-reduce job
+ * Configurator for compaction job.
+ * Different data formats should have their own impl. for this interface.
+ *
  */
 @Slf4j
-public class CompactionAvroJobConfigurator {
+public abstract class CompactionJobConfigurator {
+
+  public static final String COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY = 
"compaction.jobConfiguratorFactory.class";
+  public static final String DEFAULT_COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS 
=
+      
"org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator$Factory";
+
+
+  @Getter
+  @AllArgsConstructor
+  protected enum EXTENSION {
+    AVRO("avro"), ORC("orc");
+
+    private String extensionString;
+  }
+
   protected final State state;
 
   @Getter
@@ -91,96 +93,95 @@ public class CompactionAvroJobConfigurator {
   @Getter
   protected Collection<Path> mapReduceInputPaths = null;
   @Getter
-  private long fileNameRecordCount = 0;
+  protected long fileNameRecordCount = 0;
 
-  /**
-   * Constructor
-   * @param  state  A task level state
-   */
-  public CompactionAvroJobConfigurator(State state) throws IOException {
+  public interface ConfiguratorFactory {
+    CompactionJobConfigurator createConfigurator(State state) throws 
IOException;
+  }
+
+  public CompactionJobConfigurator(State state) throws IOException {
     this.state = state;
     this.fs = getFileSystem(state);
     this.shouldDeduplicate = 
state.getPropAsBoolean(MRCompactor.COMPACTION_SHOULD_DEDUPLICATE, true);
   }
 
-  /**
-   * Refer to {@link MRCompactorAvroKeyDedupJobRunner#getDedupKeyOption()}
-   */
-  private MRCompactorAvroKeyDedupJobRunner.DedupKeyOption getDedupKeyOption() {
-    if 
(!this.state.contains(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_DEDUP_KEY))
 {
-      return MRCompactorAvroKeyDedupJobRunner.DEFAULT_DEDUP_KEY_OPTION;
+  public static CompactionJobConfigurator instantiateConfigurator(State state) 
{
+    String compactionConfiguratorFactoryClass =
+        state.getProp(COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY, 
DEFAULT_COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS);
+    try {
+      return Class.forName(compactionConfiguratorFactoryClass)
+          .asSubclass(ConfiguratorFactory.class)
+          .newInstance()
+          .createConfigurator(state);
+    } catch (ReflectiveOperationException | IOException e) {
+      throw new RuntimeException("Failed to instantiate a instance of job 
configurator:", e);
     }
-    Optional<MRCompactorAvroKeyDedupJobRunner.DedupKeyOption> option = 
Enums.getIfPresent(MRCompactorAvroKeyDedupJobRunner.DedupKeyOption.class,
-            
this.state.getProp(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_DEDUP_KEY).toUpperCase());
-    return option.isPresent() ? option.get() : 
MRCompactorAvroKeyDedupJobRunner.DEFAULT_DEDUP_KEY_OPTION;
   }
 
+  public abstract String getFileExtension();
+
   /**
-   * Refer to {@link MRCompactorAvroKeyDedupJobRunner#getKeySchema(Job, 
Schema)}
+   * Customized MR job creation for Avro.
+   *
+   * @param  dataset  A path or directory which needs compaction
+   * @return A configured map-reduce job for avro compaction
    */
-  private Schema getKeySchema(Job job, Schema topicSchema) throws IOException {
-
-    boolean keySchemaFileSpecified = 
this.state.contains(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC);
-
-    Schema keySchema = null;
-
-    MRCompactorAvroKeyDedupJobRunner.DedupKeyOption dedupKeyOption = 
getDedupKeyOption();
-    if (dedupKeyOption == MRCompactorAvroKeyDedupJobRunner.DedupKeyOption.ALL) 
{
-      log.info("Using all attributes in the schema (except Map, Arrar and Enum 
fields) for compaction");
-      keySchema = AvroUtils.removeUncomparableFields(topicSchema).get();
-    } else if (dedupKeyOption == 
MRCompactorAvroKeyDedupJobRunner.DedupKeyOption.KEY) {
-      log.info("Using key attributes in the schema for compaction");
-      keySchema = 
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
-    } else if (keySchemaFileSpecified) {
-      Path keySchemaFile = new 
Path(state.getProp(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC));
-      log.info("Using attributes specified in schema file " + keySchemaFile + 
" for compaction");
-      try {
-        keySchema = AvroUtils.parseSchemaFromFile(keySchemaFile, this.fs);
-      } catch (IOException e) {
-        log.error("Failed to parse avro schema from " + keySchemaFile
-                + ", using key attributes in the schema for compaction");
-        keySchema = 
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
-      }
+  public Job createJob(FileSystemDataset dataset) throws IOException {
+    Configuration conf = HadoopUtils.getConfFromState(state);
 
-      if (!MRCompactorAvroKeyDedupJobRunner.isKeySchemaValid(keySchema, 
topicSchema)) {
-        log.warn(String.format("Key schema %s is not compatible with record 
schema %s.", keySchema, topicSchema)
-                + "Using key attributes in the schema for compaction");
-        keySchema = 
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
-      }
-    } else {
-      log.info("Property " + 
MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC
-              + " not provided. Using key attributes in the schema for 
compaction");
-      keySchema = 
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
+    // Turn on mapreduce output compression by default
+    if (conf.get("mapreduce.output.fileoutputformat.compress") == null && 
conf.get("mapred.output.compress") == null) {
+      conf.setBoolean("mapreduce.output.fileoutputformat.compress", true);
     }
 
-    return keySchema;
-  }
+    // Disable delegation token cancellation by default
+    if (conf.get("mapreduce.job.complete.cancel.delegation.tokens") == null) {
+      conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", 
false);
+    }
 
-  private void configureSchema(Job job) throws IOException {
-    Schema newestSchema = 
MRCompactorAvroKeyDedupJobRunner.getNewestSchemaFromSource(job, this.fs);
-    if (newestSchema != null) {
-      if 
(this.state.getPropAsBoolean(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_SINGLE_INPUT_SCHEMA,
 true)) {
-        AvroJob.setInputKeySchema(job, newestSchema);
-      }
-      AvroJob.setMapOutputKeySchema(job, this.shouldDeduplicate ? 
getKeySchema(job, newestSchema) : newestSchema);
-      AvroJob.setMapOutputValueSchema(job, newestSchema);
-      AvroJob.setOutputKeySchema(job, newestSchema);
+    addJars(conf, this.state, fs);
+    Job job = Job.getInstance(conf);
+    job.setJobName(MRCompactorJobRunner.HADOOP_JOB_NAME);
+    boolean emptyDirectoryFlag = this.configureInputAndOutputPaths(job, 
dataset);
+    if (emptyDirectoryFlag) {
+      
this.state.setProp(HiveRegistrationPolicy.MAPREDUCE_JOB_INPUT_PATH_EMPTY_KEY, 
true);
     }
+    this.configureMapper(job);
+    this.configureReducer(job);
+    if (emptyDirectoryFlag || !this.shouldDeduplicate) {
+      job.setNumReduceTasks(0);
+    }
+    // Configure schema at the last step because FilesInputFormat will be used 
internally
+    this.configureSchema(job);
+    this.isJobCreated = true;
+    this.configuredJob = job;
+    return job;
   }
 
-  protected void configureMapper(Job job) {
-    job.setInputFormatClass(AvroKeyRecursiveCombineFileInputFormat.class);
-    job.setMapperClass(AvroKeyMapper.class);
-    job.setMapOutputKeyClass(AvroKey.class);
-    job.setMapOutputValueClass(AvroValue.class);
-  }
+  /**
+   * Configuring Mapper/Reducer's input/output schema for compaction MR job.
+   * The input schema for Mapper should be obtained from to-be-compacted file.
+   * The output schema for Mapper is for dedup.
+   * The output schema for Reducer should be identical to input schema of 
Mapper.
+   * @param job The compaction jobConf.
+   * @throws IOException
+   */
+  protected abstract void configureSchema(Job job) throws IOException;
 
-  protected void configureReducer(Job job) throws IOException {
-    job.setOutputFormatClass(AvroKeyCompactorOutputFormat.class);
-    job.setReducerClass(AvroKeyDedupReducer.class);
-    job.setOutputKeyClass(AvroKey.class);
-    job.setOutputValueClass(NullWritable.class);
-    setNumberOfReducers(job);
+  /**
+   * Configuring Mapper class, specific to data format.
+   */
+  protected abstract void configureMapper(Job job);
+
+  /**
+   * Configuring Reducer class, specific to data format.
+   */
+  protected abstract void configureReducer(Job job) throws IOException;
+
+  protected FileSystem getFileSystem(State state) throws IOException {
+    Configuration conf = HadoopUtils.getConfFromState(state);
+    String uri = state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, 
ConfigurationKeys.LOCAL_FS_URI);
+    return FileSystem.get(URI.create(uri), conf);
   }
 
   /**
@@ -195,17 +196,19 @@ public class CompactionAvroJobConfigurator {
     }
 
     // get target file size
-    long targetFileSize = 
this.state.getPropAsLong(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_TARGET_OUTPUT_FILE_SIZE,
+    long targetFileSize =
+        
this.state.getPropAsLong(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_TARGET_OUTPUT_FILE_SIZE,
             
MRCompactorAvroKeyDedupJobRunner.DEFAULT_COMPACTION_JOB_TARGET_OUTPUT_FILE_SIZE);
 
     // get max reducers
     int maxNumReducers = 
state.getPropAsInt(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_MAX_NUM_REDUCERS,
-            
MRCompactorAvroKeyDedupJobRunner.DEFAULT_COMPACTION_JOB_MAX_NUM_REDUCERS);
+        
MRCompactorAvroKeyDedupJobRunner.DEFAULT_COMPACTION_JOB_MAX_NUM_REDUCERS);
 
     int numReducers = Math.min(Ints.checkedCast(inputSize / targetFileSize) + 
1, maxNumReducers);
 
     // get use prime reducers
-    boolean usePrimeReducers = 
state.getPropAsBoolean(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_USE_PRIME_REDUCERS,
+    boolean usePrimeReducers =
+        
state.getPropAsBoolean(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_USE_PRIME_REDUCERS,
             
MRCompactorAvroKeyDedupJobRunner.DEFAULT_COMPACTION_JOB_USE_PRIME_REDUCERS);
 
     if (usePrimeReducers && numReducers != 1) {
@@ -214,24 +217,18 @@ public class CompactionAvroJobConfigurator {
     job.setNumReduceTasks(numReducers);
   }
 
-  /**
-   * Concatenate multiple directory or file names into one path
-   *
-   * @return Concatenated path or null if the parameter is empty
-   */
-  private Path concatPaths (String ...names) {
-    if (names == null || names.length == 0) {
-      return null;
+  protected void addJars(Configuration conf, State state, FileSystem fs) 
throws IOException {
+    if (!state.contains(MRCompactor.COMPACTION_JARS)) {
+      return;
     }
-    Path cur = new Path(names[0]);
-    for (int i = 1; i < names.length; ++i) {
-      cur = new Path(cur, new Path(names[i]));
+    Path jarFileDir = new Path(state.getProp(MRCompactor.COMPACTION_JARS));
+    for (FileStatus status : fs.listStatus(jarFileDir)) {
+      DistributedCache.addFileToClassPath(status.getPath(), conf, fs);
     }
-    return  cur;
   }
 
   /**
-   * Refer to {@link 
MRCompactorAvroKeyDedupJobRunner#configureInputAndOutputPaths(Job)}.
+   * Refer to 
MRCompactorAvroKeyDedupJobRunner#configureInputAndOutputPaths(Job).
    * @return false if no valid input paths present for MR job to process,  
where a path is valid if it is
    * a directory containing one or more files.
    *
@@ -242,9 +239,9 @@ public class CompactionAvroJobConfigurator {
     String mrOutputBase = this.state.getProp(MRCompactor.COMPACTION_JOB_DIR);
     CompactionPathParser parser = new CompactionPathParser(this.state);
     CompactionPathParser.CompactionParserResult rst = parser.parse(dataset);
-    this.mrOutputPath = concatPaths (mrOutputBase, rst.getDatasetName(), 
rst.getDstSubDir(), rst.getTimeString());
+    this.mrOutputPath = concatPaths(mrOutputBase, rst.getDatasetName(), 
rst.getDstSubDir(), rst.getTimeString());
 
-    log.info ("Cleaning temporary MR output directory: " + mrOutputPath);
+    log.info("Cleaning temporary MR output directory: " + mrOutputPath);
     this.fs.delete(mrOutputPath, true);
 
     this.mapReduceInputPaths = getGranularInputPaths(dataset.datasetRoot());
@@ -253,7 +250,7 @@ public class CompactionAvroJobConfigurator {
       emptyDirectoryFlag = true;
     }
 
-    for (Path path: mapReduceInputPaths) {
+    for (Path path : mapReduceInputPaths) {
       FileInputFormat.addInputPath(job, path);
     }
 
@@ -262,61 +259,19 @@ public class CompactionAvroJobConfigurator {
   }
 
   /**
-   * Customized MR job creation. This method will be used in
-   * {@link 
org.apache.gobblin.compaction.suite.CompactionAvroSuite#createJob(Dataset)}
+   * Concatenate multiple directory or file names into one path
    *
-   * @param  dataset  A path or directory which needs compaction
-   * @return A configured map-reduce job for avro compaction
+   * @return Concatenated path or null if the parameter is empty
    */
-  public Job createJob(FileSystemDataset dataset) throws IOException {
-    Configuration conf = HadoopUtils.getConfFromState(state);
-
-    // Turn on mapreduce output compression by default
-    if (conf.get("mapreduce.output.fileoutputformat.compress") == null && 
conf.get("mapred.output.compress") == null) {
-      conf.setBoolean("mapreduce.output.fileoutputformat.compress", true);
-    }
-
-    // Disable delegation token cancellation by default
-    if (conf.get("mapreduce.job.complete.cancel.delegation.tokens") == null) {
-      conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", 
false);
-    }
-
-    addJars(conf);
-    Job job = Job.getInstance(conf);
-    job.setJobName(MRCompactorJobRunner.HADOOP_JOB_NAME);
-    boolean emptyDirectoryFlag = this.configureInputAndOutputPaths(job, 
dataset);
-    if (emptyDirectoryFlag) {
-      
this.state.setProp(HiveRegistrationPolicy.MAPREDUCE_JOB_INPUT_PATH_EMPTY_KEY, 
true);
-    }
-    this.configureMapper(job);
-    this.configureReducer(job);
-    if (emptyDirectoryFlag || !this.shouldDeduplicate) {
-      job.setNumReduceTasks(0);
-    }
-    // Configure schema at the last step because FilesInputFormat will be used 
internally
-    this.configureSchema(job);
-    this.isJobCreated = true;
-    this.configuredJob = job;
-    return job;
-  }
-
-  private void addJars(Configuration conf) throws IOException {
-    if (!state.contains(MRCompactor.COMPACTION_JARS)) {
-      return;
+  private Path concatPaths(String... names) {
+    if (names == null || names.length == 0) {
+      return null;
     }
-    Path jarFileDir = new Path(state.getProp(MRCompactor.COMPACTION_JARS));
-    for (FileStatus status : this.fs.listStatus(jarFileDir)) {
-      DistributedCache.addFileToClassPath(status.getPath(), conf, this.fs);
+    Path cur = new Path(names[0]);
+    for (int i = 1; i < names.length; ++i) {
+      cur = new Path(cur, new Path(names[i]));
     }
-  }
-
-  private FileSystem getFileSystem(State state)
-          throws IOException {
-    Configuration conf = HadoopUtils.getConfFromState(state);
-    String uri = state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, 
ConfigurationKeys.LOCAL_FS_URI);
-    FileSystem fs = FileSystem.get(URI.create(uri), conf);
-
-    return fs;
+    return cur;
   }
 
   /**
@@ -331,10 +286,10 @@ public class CompactionAvroJobConfigurator {
    * @param path top level directory needs compaction
    * @return A collection of input paths which will participate in map-reduce 
job
    */
-  protected Collection<Path> getGranularInputPaths (Path path) throws 
IOException {
+  protected Collection<Path> getGranularInputPaths(Path path) throws 
IOException {
 
     boolean appendDelta = 
this.state.getPropAsBoolean(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED,
-            MRCompactor.DEFAULT_COMPACTION_RENAME_SOURCE_DIR_ENABLED);
+        MRCompactor.DEFAULT_COMPACTION_RENAME_SOURCE_DIR_ENABLED);
 
     Set<Path> uncompacted = Sets.newHashSet();
     Set<Path> total = Sets.newHashSet();
@@ -347,7 +302,7 @@ public class CompactionAvroJobConfigurator {
         }
         total.add(fileStatus.getPath().getParent());
       } else {
-          uncompacted.add(fileStatus.getPath().getParent());
+        uncompacted.add(fileStatus.getPath().getParent());
       }
     }
 
@@ -355,8 +310,8 @@ public class CompactionAvroJobConfigurator {
       // When the output record count from mr counter doesn't match
       // the record count from input file names, we prefer file names because
       // it will be used to calculate the difference of count in next run.
-      this.fileNameRecordCount = new 
InputRecordCountHelper(this.state).calculateRecordCount (total);
-      log.info ("{} has total input record count (based on file name) {}", 
path, this.fileNameRecordCount);
+      this.fileNameRecordCount = new 
InputRecordCountHelper(this.state).calculateRecordCount(total);
+      log.info("{} has total input record count (based on file name) {}", 
path, this.fileNameRecordCount);
     }
 
     return uncompacted;
@@ -394,20 +349,23 @@ public class CompactionAvroJobConfigurator {
   /**
    * Get good files
    * The problem happens when speculative task attempt initialized but then 
killed in the middle of processing.
-   * Some partial file was generated at 
{tmp_output}/_temporary/1/_temporary/attempt_xxx_xxx/part-m-xxxx.avro,
-   * without being committed to its final destination at 
{tmp_output}/part-m-xxxx.avro.
+   * Some partial file was generated at 
{tmp_output}/_temporary/1/_temporary/attempt_xxx_xxx/xxxx(Avro file
+   * might have .avro as extension file name), without being committed to its 
final destination
+   * at {tmp_output}/xxxx.
    *
    * @param job Completed MR job
    * @param fs File system that can handle file system
+   * @param acceptableExtension file extension acceptable as "good files".
    * @return all successful files that has been committed
    */
-  public static List<Path> getGoodFiles(Job job, Path tmpPath, FileSystem fs) 
throws IOException {
-    List<TaskCompletionEvent> failedEvents = 
CompactionAvroJobConfigurator.getUnsuccessfulTaskCompletionEvent(job);
+  public static List<Path> getGoodFiles(Job job, Path tmpPath, FileSystem fs, 
List<String> acceptableExtension)
+      throws IOException {
+    List<TaskCompletionEvent> failedEvents = 
getUnsuccessfulTaskCompletionEvent(job);
 
-    List<Path> allFilePaths = DatasetHelper.getApplicableFilePaths(fs, 
tmpPath, Lists.newArrayList("avro"));
+    List<Path> allFilePaths = DatasetHelper.getApplicableFilePaths(fs, 
tmpPath, acceptableExtension);
     List<Path> goodPaths = new ArrayList<>();
     for (Path filePath: allFilePaths) {
-      if (CompactionAvroJobConfigurator.isFailedPath(filePath, failedEvents)) {
+      if (isFailedPath(filePath, failedEvents)) {
         fs.delete(filePath, false);
         log.error("{} is a bad path so it was deleted", filePath);
       } else {
@@ -418,4 +376,3 @@ public class CompactionAvroJobConfigurator {
     return goodPaths;
   }
 }
-
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionRunner.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionRunner.java
index 3976612..2d25dd8 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionRunner.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionRunner.java
@@ -45,8 +45,13 @@ import org.apache.gobblin.metrics.Tag;
  * A class for launching a Gobblin MR job for compaction through command line.
  *
  * @author Lorand Bendig
+ * @deprecated Please use {@link 
org.apache.gobblin.compaction.mapreduce.MRCompactionTask}
+ *  and {@link org.apache.gobblin.compaction.source.CompactionSource} to 
launch MR instead.
+ *  The new way enjoys simpler logic to trigger the compaction flow and more 
reliable verification criteria,
+ *  instead of using timestamp only before.
  *
  */
+@Deprecated
 public class MRCompactionRunner {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MRCompactionRunner.class);
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
index 953c0dc..5a937ed 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
@@ -93,8 +93,13 @@ import static 
org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner.Statu
  * under {@link #COMPACTION_INPUT_DIR}.
  *
  * @author Ziyang Liu
+ * @deprecated Please use {@link 
org.apache.gobblin.compaction.mapreduce.MRCompactionTask}
+ *  and {@link org.apache.gobblin.compaction.source.CompactionSource} to 
launch MR instead.
+ *  The new way enjoys simpler logic to trigger the compaction flow and more 
reliable verification criteria,
+ *  instead of using timestamp only before.
  */
 
+@Deprecated
 public class MRCompactor implements Compactor {
 
   private static final Logger LOG = LoggerFactory.getLogger(MRCompactor.class);
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java
index 182244a..c6b5770 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java
@@ -50,6 +50,10 @@ import org.apache.gobblin.util.FileListUtils;
  * compaction.topic, compaction.job.input.dir, compaction.job.dest.dir, 
compaction.job.dest.dir.
  *
  * @author Ziyang Liu
+ * @deprecated Please use {@link 
org.apache.gobblin.compaction.mapreduce.MRCompactionTask}
+ *  and {@link org.apache.gobblin.compaction.source.CompactionSource} to 
launch MR instead.
+ *  The new way enjoys simpler logic to trigger the compaction flow and more 
reliable verification criteria,
+ *  instead of using timestamp only before.
  */
 public class MRCompactorJobPropCreator {
   private static final Logger LOG = 
LoggerFactory.getLogger(MRCompactorJobPropCreator.class);
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
index 0ab3eab..d957dc5 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.compaction.mapreduce;
 
+import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
@@ -95,6 +96,10 @@ import static org.apache.gobblin.util.retry.RetryerFactory.*;
  * the output directory.
  *
  * @author Ziyang Liu
+ * @deprecated Please use {@link 
org.apache.gobblin.compaction.mapreduce.MRCompactionTask}
+ *  and {@link org.apache.gobblin.compaction.source.CompactionSource} to 
launch MR instead.
+ *  The new way enjoys simpler logic to trigger the compaction flow and more 
reliable verification criteria,
+ *  instead of using timestamp only before.
  */
 @SuppressWarnings("deprecation")
 public abstract class MRCompactorJobRunner implements Runnable, 
Comparable<MRCompactorJobRunner> {
@@ -322,7 +327,8 @@ public abstract class MRCompactorJobRunner implements 
Runnable, Comparable<MRCom
         this.submitAndWait(job);
         if (shouldPublishData(compactionTimestamp)) {
           // remove all invalid empty files due to speculative task execution
-          List<Path> goodPaths = 
CompactionAvroJobConfigurator.getGoodFiles(job, this.dataset.outputTmpPath(), 
this.tmpFs);
+          List<Path> goodPaths = CompactionJobConfigurator.getGoodFiles(job, 
this.dataset.outputTmpPath(), this.tmpFs,
+              ImmutableList.of("avro"));
 
           if (!this.recompactAllData && this.recompactFromDestPaths) {
             // append new files without deleting output directory
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
index 00b6742..35a3f82 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
@@ -17,6 +17,11 @@
 
 package org.apache.gobblin.compaction.mapreduce.avro;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Enums;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -25,7 +30,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.SchemaCompatibility;
@@ -34,6 +38,9 @@ import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroValue;
 import org.apache.avro.mapreduce.AvroJob;
 import org.apache.commons.io.FilenameUtils;
+import org.apache.gobblin.compaction.dataset.Dataset;
+import org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner;
+import org.apache.gobblin.util.AvroUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -43,16 +50,6 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Enums;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import org.apache.gobblin.compaction.dataset.Dataset;
-import org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner;
-import org.apache.gobblin.util.AvroUtils;
-
 
 /**
  * A subclass of {@link 
org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner} that configures
@@ -64,7 +61,6 @@ import org.apache.gobblin.util.AvroUtils;
  * @author Ziyang Liu
  */
 public class MRCompactorAvroKeyDedupJobRunner extends MRCompactorJobRunner {
-
   private static final Logger LOG = 
LoggerFactory.getLogger(MRCompactorAvroKeyDedupJobRunner.class);
 
   private static final String COMPACTION_JOB_PREFIX = "compaction.job.";
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
index 4e4382b..dbe5256 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
@@ -403,14 +403,14 @@ public class CompactionSource implements 
WorkUnitStreamSource<String, String> {
   protected WorkUnit createWorkUnit (Dataset dataset) throws IOException {
     WorkUnit workUnit = new WorkUnit();
     TaskUtils.setTaskFactoryClass(workUnit, MRCompactionTaskFactory.class);
-    suite.save (dataset, workUnit);
+    suite.save(dataset, workUnit);
     return workUnit;
   }
 
   protected WorkUnit createWorkUnitForFailure (Dataset dataset) throws 
IOException {
     WorkUnit workUnit = new FailedTask.FailedWorkUnit();
     TaskUtils.setTaskFactoryClass(workUnit, 
CompactionFailedTask.CompactionFailedTaskFactory.class);
-    suite.save (dataset, workUnit);
+    suite.save(dataset, workUnit);
     return workUnit;
   }
 
@@ -418,7 +418,7 @@ public class CompactionSource implements 
WorkUnitStreamSource<String, String> {
     WorkUnit workUnit = new FailedTask.FailedWorkUnit();
     workUnit.setProp(CompactionVerifier.COMPACTION_VERIFICATION_FAIL_REASON, 
reason);
     TaskUtils.setTaskFactoryClass(workUnit, 
CompactionFailedTask.CompactionFailedTaskFactory.class);
-    suite.save (dataset, workUnit);
+    suite.save(dataset, workUnit);
     return workUnit;
   }
 
@@ -479,7 +479,7 @@ public class CompactionSource implements 
WorkUnitStreamSource<String, String> {
     LocalFileSystem lfs = 
FileSystem.getLocal(HadoopUtils.getConfFromState(state));
     Path tmpJarFileDir = new Path(this.tmpJobDir, 
MRCompactor.COMPACTION_JAR_SUBDIR);
     this.fs.mkdirs(tmpJarFileDir);
-    state.setProp (MRCompactor.COMPACTION_JARS, tmpJarFileDir.toString());
+    state.setProp(MRCompactor.COMPACTION_JARS, tmpJarFileDir.toString());
 
     // copy jar files to hdfs
     for (String jarFile : 
state.getPropAsList(ConfigurationKeys.JOB_JAR_FILES_KEY)) {
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
index 1c564a6..e9e3e50 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
@@ -41,6 +41,10 @@ import org.apache.gobblin.dataset.Dataset;
  *
  * The class also handles how to create a map-reduce job and how to serialized 
and deserialize a {@link Dataset}
  * to and from a {@link org.apache.gobblin.source.workunit.WorkUnit} properly.
+ *
+ * CompactionSuite should only be aware of verification methods and different 
definition of datasets,
+ * but unaware of data format, which is handled by different implementation of
+ * {@link org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator}
  */
 
 public interface CompactionSuite<D extends Dataset> {
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java
similarity index 84%
rename from 
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java
rename to 
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java
index 7b62671..a263e0c 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 
@@ -31,7 +32,6 @@ import 
org.apache.gobblin.compaction.action.CompactionCompleteAction;
 import 
org.apache.gobblin.compaction.action.CompactionCompleteFileOperationAction;
 import org.apache.gobblin.compaction.action.CompactionHiveRegistrationAction;
 import org.apache.gobblin.compaction.action.CompactionMarkDirectoryAction;
-import org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator;
 import org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier;
 import org.apache.gobblin.compaction.verify.CompactionThresholdVerifier;
 import org.apache.gobblin.compaction.verify.CompactionTimeRangeVerifier;
@@ -40,18 +40,19 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.FileSystemDataset;
 
 /**
- * A type of {@link CompactionSuite} which implements all components needed 
for avro file compaction.
+ * A type of {@link CompactionSuite} which implements all components needed 
for file compaction.
+ * The format-specific implementation is contained in the impl. of {@link 
CompactionJobConfigurator}
  */
 @Slf4j
-public class CompactionAvroSuite implements CompactionSuite<FileSystemDataset> 
{
+public class CompactionSuiteBase implements CompactionSuite<FileSystemDataset> 
{
   public static final String SERIALIZE_COMPACTION_FILE_PATH_NAME = 
"compaction-file-path-name";
   private State state;
-  private CompactionAvroJobConfigurator configurator = null;
+  private CompactionJobConfigurator configurator = null;
 
   /**
    * Constructor
    */
-  public CompactionAvroSuite (State state) {
+  public CompactionSuiteBase(State state) {
     this.state = state;
   }
 
@@ -75,8 +76,7 @@ public class CompactionAvroSuite implements 
CompactionSuite<FileSystemDataset> {
    *         {@link org.apache.gobblin.compaction.mapreduce.MRCompactionTask} 
starts the map-reduce job
    */
   public List<CompactionVerifier<FileSystemDataset>> getMapReduceVerifiers() {
-    List<CompactionVerifier<FileSystemDataset>> list = new ArrayList<>();
-    return list;
+    return new ArrayList<>();
   }
 
   /**
@@ -123,14 +123,14 @@ public class CompactionAvroSuite implements 
CompactionSuite<FileSystemDataset> {
   }
 
   /**
-   * Constructs a map-reduce job suitable for avro compaction. The detailed 
configuration
-   * work is delegated to {@link 
CompactionAvroJobConfigurator#createJob(FileSystemDataset)}
+   * Constructs a map-reduce job suitable for compaction. The detailed 
format-specific configuration
+   * work is delegated to {@link 
CompactionJobConfigurator#createJob(FileSystemDataset)}
    *
-   * @param  dataset a top level input path which contains all avro files 
those need to be compacted
-   * @return a map-reduce job which will compact avro files against {@link 
org.apache.gobblin.dataset.Dataset}
+   * @param  dataset a top level input path which contains all files those 
need to be compacted
+   * @return a map-reduce job which will compact files against {@link 
org.apache.gobblin.dataset.Dataset}
    */
   public Job createJob (FileSystemDataset dataset) throws IOException {
-    configurator = new CompactionAvroJobConfigurator(this.state);
+    configurator = 
CompactionJobConfigurator.instantiateConfigurator(this.state);
     return configurator.createJob(dataset);
   }
 }
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuiteFactory.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseFactory.java
similarity index 81%
rename from 
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuiteFactory.java
rename to 
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseFactory.java
index 70e2282..827fe28 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuiteFactory.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseFactory.java
@@ -21,11 +21,11 @@ import org.apache.gobblin.annotation.Alias;
 import org.apache.gobblin.configuration.State;
 
 /**
- * A {@link CompactionSuiteFactory} that handles {@link CompactionAvroSuite} 
creation logic.
+ * A {@link CompactionSuiteFactory} that handles {@link CompactionSuiteBase} 
creation logic.
  */
-@Alias("CompactionAvroSuiteFactory")
-public class CompactionAvroSuiteFactory implements CompactionSuiteFactory {
-  public CompactionAvroSuite createSuite (State state) {
-    return new CompactionAvroSuite (state);
+@Alias("CompactionSuiteBaseFactory")
+public class CompactionSuiteBaseFactory implements CompactionSuiteFactory {
+  public CompactionSuiteBase createSuite (State state) {
+    return new CompactionSuiteBase(state);
   }
 }
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteUtils.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteUtils.java
index ac599b5..e4066c6 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteUtils.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteUtils.java
@@ -21,6 +21,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.util.ClassAliasResolver;
 
+
 /**
  * A utility class for {@link CompactionSuite}
  */
@@ -28,13 +29,15 @@ public class CompactionSuiteUtils {
 
   /**
    * Return an {@link CompactionSuiteFactory} based on the configuration
-   * @return A concrete suite factory instance. By default {@link 
CompactionAvroSuiteFactory} is used.
+   * @return A concrete suite factory instance. By default {@link 
CompactionSuiteBaseFactory} is used.
    */
-  public static CompactionSuiteFactory getCompactionSuiteFactory (State state) 
{
+  public static CompactionSuiteFactory getCompactionSuiteFactory(State state) {
     try {
-      String factoryName = 
state.getProp(ConfigurationKeys.COMPACTION_SUITE_FACTORY, 
ConfigurationKeys.DEFAULT_COMPACTION_SUITE_FACTORY);
+      String factoryName =
+          state.getProp(ConfigurationKeys.COMPACTION_SUITE_FACTORY, 
ConfigurationKeys.DEFAULT_COMPACTION_SUITE_FACTORY);
 
-      ClassAliasResolver<CompactionSuiteFactory> conditionClassAliasResolver = 
new ClassAliasResolver<>(CompactionSuiteFactory.class);
+      ClassAliasResolver<CompactionSuiteFactory> conditionClassAliasResolver =
+          new ClassAliasResolver<>(CompactionSuiteFactory.class);
       CompactionSuiteFactory factory = 
conditionClassAliasResolver.resolveClass(factoryName).newInstance();
       return factory;
     } catch (IllegalAccessException | InstantiationException | 
ClassNotFoundException e) {
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
index a2751b9..03ab36d 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
@@ -69,7 +69,7 @@ public class CompactionThresholdVerifier implements 
CompactionVerifier<FileSyste
 
     CompactionPathParser.CompactionParserResult result = new 
CompactionPathParser(state).parse(dataset);
 
-    double threshold = 
RecompactionConditionBasedOnRatio.getRatioThresholdByDatasetName 
(result.getDatasetName(), thresholdMap);
+    double threshold = 
RecompactionConditionBasedOnRatio.getRatioThresholdByDatasetName(result.getDatasetName(),
 thresholdMap);
     log.debug ("Threshold is {} for dataset {}", threshold, 
result.getDatasetName());
 
     InputRecordCountHelper helper = new InputRecordCountHelper(state);
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
index 5b49193..6d9d8c4 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
@@ -17,30 +17,23 @@
 
 package org.apache.gobblin.compaction.verify;
 
+import com.google.common.base.Splitter;
 import java.util.List;
-import java.util.Map;
 import java.util.regex.Pattern;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.Period;
-import org.joda.time.format.PeriodFormatter;
-import org.joda.time.format.PeriodFormatterBuilder;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Maps;
-
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
 import org.apache.gobblin.compaction.source.CompactionSource;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.FileSystemDataset;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Period;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
 
 /**
  * A simple class which verify current dataset belongs to a specific time 
range. Will skip to do
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/suite/TestCompactionSuiteFactories.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/suite/TestCompactionSuiteFactories.java
index 7cfdb92..48890e2 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/suite/TestCompactionSuiteFactories.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/suite/TestCompactionSuiteFactories.java
@@ -27,7 +27,7 @@ public class TestCompactionSuiteFactories {
    * Test hive registration failure
    */
   @Alias("HiveRegistrationFailureFactory")
-  public static class HiveRegistrationFailureFactory extends 
CompactionAvroSuiteFactory {
+  public static class HiveRegistrationFailureFactory implements 
CompactionSuiteFactory {
     public TestCompactionSuites.HiveRegistrationCompactionSuite createSuite 
(State state) {
       return new TestCompactionSuites.HiveRegistrationCompactionSuite(state);
     }
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/suite/TestCompactionSuites.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/suite/TestCompactionSuites.java
index 87d41d2..59d762e 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/suite/TestCompactionSuites.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/suite/TestCompactionSuites.java
@@ -32,7 +32,7 @@ public class TestCompactionSuites {
   /**
    * Test hive registration failure
    */
-  public static class HiveRegistrationCompactionSuite extends 
CompactionAvroSuite {
+  public static class HiveRegistrationCompactionSuite extends 
CompactionSuiteBase {
 
     public HiveRegistrationCompactionSuite(State state) {
       super(state);
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index ccc94cb..533ba12 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -455,7 +455,7 @@ public class GobblinMultiTaskAttempt {
       commit();
     } else if (!isSpeculativeExecutionSafe()) {
       throw new RuntimeException(
-          "Specualtive execution is enabled. However, the task context is not 
safe for speculative execution.");
+          "Speculative execution is enabled. However, the task context is not 
safe for speculative execution.");
     }
   }
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
index 3a188a6..da335d8 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
@@ -74,7 +74,7 @@ public class TaskStateCollectorService extends 
AbstractScheduledService {
   private final Path outputTaskStateDir;
 
   /**
-   * Add a cloesable action to run after each existence-checking of task state 
file.
+   * Add a closeable action to run after each existence-checking of task state 
file.
    * A typical example to plug here is hive registration:
    * We do hive registration everytime there are available taskStates 
deserialized from storage, on the driver level.
    */
@@ -204,7 +204,7 @@ public class TaskStateCollectorService extends 
AbstractScheduledService {
       this.jobState.addTaskState(taskState);
     }
 
-    // Finish any addtional steps defined in handler on driver level.
+    // Finish any additional steps defined in handler on driver level.
     // Currently implemented handler for Hive registration only.
     if (optionalTaskCollectorHandler.isPresent()) {
       LOGGER.info("Execute Pipelined TaskStateCollectorService Handler for " + 
taskStateQueue.size() + " tasks");

Reply via email to