This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new c89bfcd [GOBBLIN-691] Make format-specific component pluggable in
compaction
c89bfcd is described below
commit c89bfcdf75a2cc0c3ca93be07cd108eb6e3c4cb5
Author: Lei Sun <[email protected]>
AuthorDate: Tue Mar 5 11:16:25 2019 -0800
[GOBBLIN-691] Make format-specific component pluggable in compaction
Closes #2563 from autumnust/orcCompaction
---
.../gobblin/configuration/ConfigurationKeys.java | 2 +-
.../gobblin/compaction/CompactorFactory.java | 5 +
.../compaction/ReflectionCompactorFactory.java | 6 +
.../CompactionCompleteFileOperationAction.java | 27 +-
.../action/CompactionMarkDirectoryAction.java | 5 +-
.../mapreduce/CompactionAvroJobConfigurator.java | 336 +++------------------
...gurator.java => CompactionJobConfigurator.java} | 327 +++++++++-----------
.../compaction/mapreduce/MRCompactionRunner.java | 5 +
.../gobblin/compaction/mapreduce/MRCompactor.java | 5 +
.../mapreduce/MRCompactorJobPropCreator.java | 4 +
.../compaction/mapreduce/MRCompactorJobRunner.java | 8 +-
.../avro/MRCompactorAvroKeyDedupJobRunner.java | 20 +-
.../compaction/source/CompactionSource.java | 8 +-
.../gobblin/compaction/suite/CompactionSuite.java | 4 +
...tionAvroSuite.java => CompactionSuiteBase.java} | 24 +-
...actory.java => CompactionSuiteBaseFactory.java} | 10 +-
.../compaction/suite/CompactionSuiteUtils.java | 11 +-
.../verify/CompactionThresholdVerifier.java | 2 +-
.../verify/CompactionTimeRangeVerifier.java | 21 +-
.../suite/TestCompactionSuiteFactories.java | 2 +-
.../compaction/suite/TestCompactionSuites.java | 2 +-
.../gobblin/runtime/GobblinMultiTaskAttempt.java | 2 +-
.../gobblin/runtime/TaskStateCollectorService.java | 4 +-
23 files changed, 280 insertions(+), 560 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index c0feda1..9004cff 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -902,7 +902,7 @@ public class ConfigurationKeys {
*/
public static final String COMPACTION_PREFIX = "compaction.";
public static final String COMPACTION_SUITE_FACTORY = COMPACTION_PREFIX +
"suite.factory";
- public static final String DEFAULT_COMPACTION_SUITE_FACTORY =
"CompactionAvroSuiteFactory";
+ public static final String DEFAULT_COMPACTION_SUITE_FACTORY =
"CompactionSuiteBaseFactory";
public static final String COMPACTION_PRIORITIZATION_PREFIX =
COMPACTION_PREFIX + "prioritization.";
public static final String COMPACTION_PRIORITIZER_ALIAS =
COMPACTION_PRIORITIZATION_PREFIX + "prioritizerAlias";
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/CompactorFactory.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/CompactorFactory.java
index cb7e7c0..705ed84 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/CompactorFactory.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/CompactorFactory.java
@@ -29,8 +29,13 @@ import org.apache.gobblin.metrics.Tag;
/**
* A factory responsible for creating {@link Compactor}s.
+ * @deprecated Please use {@link
org.apache.gobblin.compaction.mapreduce.MRCompactionTask}
+ * * and {@link org.apache.gobblin.compaction.source.CompactionSource} to
launch MR instead.
+ * * The new way enjoys simpler logic to trigger the compaction flow and more
reliable verification criteria,
+ * * instead of using timestamp only before.
*/
@Alpha
+@Deprecated
public interface CompactorFactory {
/**
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java
index 1ba6fac..2e475fc 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java
@@ -31,7 +31,13 @@ import org.apache.gobblin.metrics.Tag;
/**
* Implementation of {@link CompactorFactory} that creates a {@link Compactor}
using reflection.
+ *
+ * @deprecated Please use {@link
org.apache.gobblin.compaction.mapreduce.MRCompactionTask}
+ * and {@link org.apache.gobblin.compaction.source.CompactionSource} to launch
MR instead.
+ * The new way enjoys simpler logic to trigger the compaction flow and more
reliable verification criteria,
+ * instead of using timestamp only before.
*/
+@Deprecated
public class ReflectionCompactorFactory implements CompactorFactory {
@VisibleForTesting
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index 831443b..8dc6324 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -17,23 +17,15 @@
package org.apache.gobblin.compaction.action;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.List;
import java.util.Map;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Job;
-
-import com.google.common.collect.ImmutableMap;
-
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-
import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
-import org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator;
+import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner;
import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyMapper;
@@ -45,22 +37,28 @@ import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.WriterUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
/**
* A type of post action {@link CompactionCompleteAction} which focus on the
file operations
+ *
*/
@Slf4j
@AllArgsConstructor
public class CompactionCompleteFileOperationAction implements
CompactionCompleteAction<FileSystemDataset> {
protected WorkUnitState state;
- private CompactionAvroJobConfigurator configurator;
+ private CompactionJobConfigurator configurator;
private InputRecordCountHelper helper;
private EventSubmitter eventSubmitter;
private FileSystem fs;
- public CompactionCompleteFileOperationAction (State state,
CompactionAvroJobConfigurator configurator) {
+ public CompactionCompleteFileOperationAction (State state,
CompactionJobConfigurator configurator) {
if (!(state instanceof WorkUnitState)) {
throw new UnsupportedOperationException(this.getClass().getName() + "
only supports workunit state");
}
@@ -90,7 +88,8 @@ public class CompactionCompleteFileOperationAction implements
CompactionComplete
long oldTotalRecords = helper.readRecordCount(new Path
(result.getDstAbsoluteDir()));
long executeCount = helper.readExecutionCount (new Path
(result.getDstAbsoluteDir()));
- List<Path> goodPaths = CompactionAvroJobConfigurator.getGoodFiles(job,
tmpPath, this.fs);
+ List<Path> goodPaths = CompactionJobConfigurator.getGoodFiles(job,
tmpPath, this.fs,
+ ImmutableList.of(configurator.getFileExtension()));
if (appendDeltaOutput) {
FsPermission permission =
HadoopUtils.deserializeFsPermission(this.state,
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
index 89be94a..ac1f1d7 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Map;
+import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -43,10 +44,10 @@ import org.apache.gobblin.metrics.event.EventSubmitter;
@AllArgsConstructor
public class CompactionMarkDirectoryAction implements
CompactionCompleteAction<FileSystemDataset> {
protected State state;
- private CompactionAvroJobConfigurator configurator;
+ private CompactionJobConfigurator configurator;
private FileSystem fs;
private EventSubmitter eventSubmitter;
- public CompactionMarkDirectoryAction(State state,
CompactionAvroJobConfigurator configurator) {
+ public CompactionMarkDirectoryAction(State state, CompactionJobConfigurator
configurator) {
if (!(state instanceof WorkUnitState)) {
throw new UnsupportedOperationException(this.getClass().getName() + "
only supports workunit state");
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
index 1779e33..14d23d5 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
@@ -17,110 +17,78 @@
package org.apache.gobblin.compaction.mapreduce;
+import com.google.common.base.Enums;
+import com.google.common.base.Optional;
import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
-import org.apache.commons.math3.primes.Primes;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskCompletionEvent;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-import com.google.common.base.Enums;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.primitives.Ints;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.compaction.dataset.DatasetHelper;
import
org.apache.gobblin.compaction.mapreduce.avro.AvroKeyCompactorOutputFormat;
import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyDedupReducer;
import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyMapper;
import
org.apache.gobblin.compaction.mapreduce.avro.AvroKeyRecursiveCombineFileInputFormat;
import
org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
-import org.apache.gobblin.compaction.parser.CompactionPathParser;
-import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
-import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.compaction.suite.CompactionSuiteBase;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.Dataset;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
import org.apache.gobblin.util.AvroUtils;
-import org.apache.gobblin.util.FileListUtils;
import org.apache.gobblin.util.HadoopUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+
/**
* A configurator that focused on creating avro compaction map-reduce job
*/
@Slf4j
-public class CompactionAvroJobConfigurator {
- protected final State state;
+public class CompactionAvroJobConfigurator extends CompactionJobConfigurator {
- @Getter
- protected final FileSystem fs;
+ public static class Factory implements
CompactionJobConfigurator.ConfiguratorFactory {
+ @Override
+ public CompactionJobConfigurator createConfigurator(State state) throws
IOException {
+ return new CompactionAvroJobConfigurator(state);
+ }
+ }
- // Below attributes are MR related
- @Getter
- protected Job configuredJob;
- @Getter
- protected final boolean shouldDeduplicate;
- @Getter
- protected Path mrOutputPath = null;
- @Getter
- protected boolean isJobCreated = false;
- @Getter
- protected Collection<Path> mapReduceInputPaths = null;
- @Getter
- private long fileNameRecordCount = 0;
+ @Override
+ public String getFileExtension(){
+ return EXTENSION.AVRO.getExtensionString();
+ }
/**
* Constructor
* @param state A task level state
*/
public CompactionAvroJobConfigurator(State state) throws IOException {
- this.state = state;
- this.fs = getFileSystem(state);
- this.shouldDeduplicate =
state.getPropAsBoolean(MRCompactor.COMPACTION_SHOULD_DEDUPLICATE, true);
+ super(state);
}
/**
- * Refer to {@link MRCompactorAvroKeyDedupJobRunner#getDedupKeyOption()}
+ * Refer to MRCompactorAvroKeyDedupJobRunner#getDedupKeyOption()
*/
private MRCompactorAvroKeyDedupJobRunner.DedupKeyOption getDedupKeyOption() {
if
(!this.state.contains(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_DEDUP_KEY))
{
return MRCompactorAvroKeyDedupJobRunner.DEFAULT_DEDUP_KEY_OPTION;
}
- Optional<MRCompactorAvroKeyDedupJobRunner.DedupKeyOption> option =
Enums.getIfPresent(MRCompactorAvroKeyDedupJobRunner.DedupKeyOption.class,
+ Optional<MRCompactorAvroKeyDedupJobRunner.DedupKeyOption> option =
+
Enums.getIfPresent(MRCompactorAvroKeyDedupJobRunner.DedupKeyOption.class,
this.state.getProp(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_DEDUP_KEY).toUpperCase());
return option.isPresent() ? option.get() :
MRCompactorAvroKeyDedupJobRunner.DEFAULT_DEDUP_KEY_OPTION;
}
/**
- * Refer to {@link MRCompactorAvroKeyDedupJobRunner#getKeySchema(Job,
Schema)}
+ * Refer to MRCompactorAvroKeyDedupJobRunner#getKeySchema(Job, Schema)
*/
private Schema getKeySchema(Job job, Schema topicSchema) throws IOException {
- boolean keySchemaFileSpecified =
this.state.contains(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC);
+ boolean keySchemaFileSpecified =
+
this.state.contains(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC);
Schema keySchema = null;
@@ -138,25 +106,28 @@ public class CompactionAvroJobConfigurator {
keySchema = AvroUtils.parseSchemaFromFile(keySchemaFile, this.fs);
} catch (IOException e) {
log.error("Failed to parse avro schema from " + keySchemaFile
- + ", using key attributes in the schema for compaction");
- keySchema =
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
+ + ", using key attributes in the schema for compaction");
+ keySchema =
+
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
}
if (!MRCompactorAvroKeyDedupJobRunner.isKeySchemaValid(keySchema,
topicSchema)) {
log.warn(String.format("Key schema %s is not compatible with record
schema %s.", keySchema, topicSchema)
- + "Using key attributes in the schema for compaction");
- keySchema =
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
+ + "Using key attributes in the schema for compaction");
+ keySchema =
+
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
}
} else {
log.info("Property " +
MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC
- + " not provided. Using key attributes in the schema for
compaction");
+ + " not provided. Using key attributes in the schema for
compaction");
keySchema =
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
}
return keySchema;
}
- private void configureSchema(Job job) throws IOException {
+ @Override
+ protected void configureSchema(Job job) throws IOException {
Schema newestSchema =
MRCompactorAvroKeyDedupJobRunner.getNewestSchemaFromSource(job, this.fs);
if (newestSchema != null) {
if
(this.state.getPropAsBoolean(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_SINGLE_INPUT_SCHEMA,
true)) {
@@ -182,240 +153,5 @@ public class CompactionAvroJobConfigurator {
job.setOutputValueClass(NullWritable.class);
setNumberOfReducers(job);
}
-
- /**
- * Refer to {@link MRCompactorAvroKeyDedupJobRunner#setNumberOfReducers(Job)}
- */
- protected void setNumberOfReducers(Job job) throws IOException {
-
- // get input size
- long inputSize = 0;
- for (Path inputPath : this.mapReduceInputPaths) {
- inputSize += this.fs.getContentSummary(inputPath).getLength();
- }
-
- // get target file size
- long targetFileSize =
this.state.getPropAsLong(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_TARGET_OUTPUT_FILE_SIZE,
-
MRCompactorAvroKeyDedupJobRunner.DEFAULT_COMPACTION_JOB_TARGET_OUTPUT_FILE_SIZE);
-
- // get max reducers
- int maxNumReducers =
state.getPropAsInt(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_MAX_NUM_REDUCERS,
-
MRCompactorAvroKeyDedupJobRunner.DEFAULT_COMPACTION_JOB_MAX_NUM_REDUCERS);
-
- int numReducers = Math.min(Ints.checkedCast(inputSize / targetFileSize) +
1, maxNumReducers);
-
- // get use prime reducers
- boolean usePrimeReducers =
state.getPropAsBoolean(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_USE_PRIME_REDUCERS,
-
MRCompactorAvroKeyDedupJobRunner.DEFAULT_COMPACTION_JOB_USE_PRIME_REDUCERS);
-
- if (usePrimeReducers && numReducers != 1) {
- numReducers = Primes.nextPrime(numReducers);
- }
- job.setNumReduceTasks(numReducers);
- }
-
- /**
- * Concatenate multiple directory or file names into one path
- *
- * @return Concatenated path or null if the parameter is empty
- */
- private Path concatPaths (String ...names) {
- if (names == null || names.length == 0) {
- return null;
- }
- Path cur = new Path(names[0]);
- for (int i = 1; i < names.length; ++i) {
- cur = new Path(cur, new Path(names[i]));
- }
- return cur;
- }
-
- /**
- * Refer to {@link
MRCompactorAvroKeyDedupJobRunner#configureInputAndOutputPaths(Job)}.
- * @return false if no valid input paths present for MR job to process,
where a path is valid if it is
- * a directory containing one or more files.
- *
- */
- protected boolean configureInputAndOutputPaths(Job job, FileSystemDataset
dataset) throws IOException {
- boolean emptyDirectoryFlag = false;
-
- String mrOutputBase = this.state.getProp(MRCompactor.COMPACTION_JOB_DIR);
- CompactionPathParser parser = new CompactionPathParser(this.state);
- CompactionPathParser.CompactionParserResult rst = parser.parse(dataset);
- this.mrOutputPath = concatPaths (mrOutputBase, rst.getDatasetName(),
rst.getDstSubDir(), rst.getTimeString());
-
- log.info ("Cleaning temporary MR output directory: " + mrOutputPath);
- this.fs.delete(mrOutputPath, true);
-
- this.mapReduceInputPaths = getGranularInputPaths(dataset.datasetRoot());
- if (this.mapReduceInputPaths.isEmpty()) {
- this.mapReduceInputPaths.add(dataset.datasetRoot());
- emptyDirectoryFlag = true;
- }
-
- for (Path path: mapReduceInputPaths) {
- FileInputFormat.addInputPath(job, path);
- }
-
- FileOutputFormat.setOutputPath(job, mrOutputPath);
- return emptyDirectoryFlag;
- }
-
- /**
- * Customized MR job creation. This method will be used in
- * {@link
org.apache.gobblin.compaction.suite.CompactionAvroSuite#createJob(Dataset)}
- *
- * @param dataset A path or directory which needs compaction
- * @return A configured map-reduce job for avro compaction
- */
- public Job createJob(FileSystemDataset dataset) throws IOException {
- Configuration conf = HadoopUtils.getConfFromState(state);
-
- // Turn on mapreduce output compression by default
- if (conf.get("mapreduce.output.fileoutputformat.compress") == null &&
conf.get("mapred.output.compress") == null) {
- conf.setBoolean("mapreduce.output.fileoutputformat.compress", true);
- }
-
- // Disable delegation token cancellation by default
- if (conf.get("mapreduce.job.complete.cancel.delegation.tokens") == null) {
- conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens",
false);
- }
-
- addJars(conf);
- Job job = Job.getInstance(conf);
- job.setJobName(MRCompactorJobRunner.HADOOP_JOB_NAME);
- boolean emptyDirectoryFlag = this.configureInputAndOutputPaths(job,
dataset);
- if (emptyDirectoryFlag) {
-
this.state.setProp(HiveRegistrationPolicy.MAPREDUCE_JOB_INPUT_PATH_EMPTY_KEY,
true);
- }
- this.configureMapper(job);
- this.configureReducer(job);
- if (emptyDirectoryFlag || !this.shouldDeduplicate) {
- job.setNumReduceTasks(0);
- }
- // Configure schema at the last step because FilesInputFormat will be used
internally
- this.configureSchema(job);
- this.isJobCreated = true;
- this.configuredJob = job;
- return job;
- }
-
- private void addJars(Configuration conf) throws IOException {
- if (!state.contains(MRCompactor.COMPACTION_JARS)) {
- return;
- }
- Path jarFileDir = new Path(state.getProp(MRCompactor.COMPACTION_JARS));
- for (FileStatus status : this.fs.listStatus(jarFileDir)) {
- DistributedCache.addFileToClassPath(status.getPath(), conf, this.fs);
- }
- }
-
- private FileSystem getFileSystem(State state)
- throws IOException {
- Configuration conf = HadoopUtils.getConfFromState(state);
- String uri = state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI,
ConfigurationKeys.LOCAL_FS_URI);
- FileSystem fs = FileSystem.get(URI.create(uri), conf);
-
- return fs;
- }
-
- /**
- * Converts a top level input path to a group of sub-paths according to user
defined granularity.
- * This may be required because if upstream application generates many
sub-paths but the map-reduce
- * job only keeps track of the top level path, after the job is done, we
won't be able to tell if
- * those new arriving sub-paths is processed by previous map-reduce job or
not. Hence a better way
- * is to pre-define those sub-paths as input paths before we start to run
MR. The implementation of
- * this method should depend on the data generation granularity controlled
by upstream. Here we just
- * list the deepest level of containing folder as the smallest granularity.
- *
- * @param path top level directory needs compaction
- * @return A collection of input paths which will participate in map-reduce
job
- */
- protected Collection<Path> getGranularInputPaths (Path path) throws
IOException {
-
- boolean appendDelta =
this.state.getPropAsBoolean(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED,
- MRCompactor.DEFAULT_COMPACTION_RENAME_SOURCE_DIR_ENABLED);
-
- Set<Path> uncompacted = Sets.newHashSet();
- Set<Path> total = Sets.newHashSet();
-
- for (FileStatus fileStatus : FileListUtils.listFilesRecursively(fs, path))
{
- if (appendDelta) {
- // use source dir suffix to identify the delta input paths
- if
(!fileStatus.getPath().getParent().toString().endsWith(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_SUFFIX))
{
- uncompacted.add(fileStatus.getPath().getParent());
- }
- total.add(fileStatus.getPath().getParent());
- } else {
- uncompacted.add(fileStatus.getPath().getParent());
- }
- }
-
- if (appendDelta) {
- // When the output record count from mr counter doesn't match
- // the record count from input file names, we prefer file names because
- // it will be used to calculate the difference of count in next run.
- this.fileNameRecordCount = new
InputRecordCountHelper(this.state).calculateRecordCount (total);
- log.info ("{} has total input record count (based on file name) {}",
path, this.fileNameRecordCount);
- }
-
- return uncompacted;
- }
-
- private static List<TaskCompletionEvent> getAllTaskCompletionEvent(Job
completedJob) {
- List<TaskCompletionEvent> completionEvents = new LinkedList<>();
-
- while (true) {
- try {
- TaskCompletionEvent[] bunchOfEvents;
- bunchOfEvents =
completedJob.getTaskCompletionEvents(completionEvents.size());
- if (bunchOfEvents == null || bunchOfEvents.length == 0) {
- break;
- }
- completionEvents.addAll(Arrays.asList(bunchOfEvents));
- } catch (IOException e) {
- break;
- }
- }
-
- return completionEvents;
- }
-
- private static List<TaskCompletionEvent>
getUnsuccessfulTaskCompletionEvent(Job completedJob) {
- return
getAllTaskCompletionEvent(completedJob).stream().filter(te->te.getStatus() !=
TaskCompletionEvent.Status.SUCCEEDED).collect(
- Collectors.toList());
- }
-
- private static boolean isFailedPath(Path path, List<TaskCompletionEvent>
failedEvents) {
- return path.toString().contains("_temporary") || failedEvents.stream()
- .anyMatch(event -> path.toString().contains(Path.SEPARATOR +
event.getTaskAttemptId().toString() + Path.SEPARATOR));
- }
-
- /**
- * Get good files
- * The problem happens when speculative task attempt initialized but then
killed in the middle of processing.
- * Some partial file was generated at
{tmp_output}/_temporary/1/_temporary/attempt_xxx_xxx/part-m-xxxx.avro,
- * without being committed to its final destination at
{tmp_output}/part-m-xxxx.avro.
- *
- * @param job Completed MR job
- * @param fs File system that can handle file system
- * @return all successful files that has been committed
- */
- public static List<Path> getGoodFiles(Job job, Path tmpPath, FileSystem fs)
throws IOException {
- List<TaskCompletionEvent> failedEvents =
CompactionAvroJobConfigurator.getUnsuccessfulTaskCompletionEvent(job);
-
- List<Path> allFilePaths = DatasetHelper.getApplicableFilePaths(fs,
tmpPath, Lists.newArrayList("avro"));
- List<Path> goodPaths = new ArrayList<>();
- for (Path filePath: allFilePaths) {
- if (CompactionAvroJobConfigurator.isFailedPath(filePath, failedEvents)) {
- fs.delete(filePath, false);
- log.error("{} is a bad path so it was deleted", filePath);
- } else {
- goodPaths.add(filePath);
- }
- }
-
- return goodPaths;
- }
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
similarity index 60%
copy from
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
copy to
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
index 1779e33..d88ad95 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
@@ -17,6 +17,10 @@
package org.apache.gobblin.compaction.mapreduce;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
@@ -26,54 +30,52 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
-
-import org.apache.avro.Schema;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.commons.math3.primes.Primes;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskCompletionEvent;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-import com.google.common.base.Enums;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.primitives.Ints;
-
+import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-
+import org.apache.commons.math3.primes.Primes;
import org.apache.gobblin.compaction.dataset.DatasetHelper;
-import
org.apache.gobblin.compaction.mapreduce.avro.AvroKeyCompactorOutputFormat;
-import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyDedupReducer;
-import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyMapper;
-import
org.apache.gobblin.compaction.mapreduce.avro.AvroKeyRecursiveCombineFileInputFormat;
import
org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.dataset.Dataset;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
-import org.apache.gobblin.util.AvroUtils;
import org.apache.gobblin.util.FileListUtils;
import org.apache.gobblin.util.HadoopUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
/**
- * A configurator that focused on creating avro compaction map-reduce job
+ * Configurator for compaction job.
+ * Different data formats should have their own impl. for this interface.
+ *
*/
@Slf4j
-public class CompactionAvroJobConfigurator {
+public abstract class CompactionJobConfigurator {
+
+ public static final String COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY =
"compaction.jobConfiguratorFactory.class";
+ public static final String DEFAULT_COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS
=
+
"org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator$Factory";
+
+
+ @Getter
+ @AllArgsConstructor
+ protected enum EXTENSION {
+ AVRO("avro"), ORC("orc");
+
+ private String extensionString;
+ }
+
protected final State state;
@Getter
@@ -91,96 +93,95 @@ public class CompactionAvroJobConfigurator {
@Getter
protected Collection<Path> mapReduceInputPaths = null;
@Getter
- private long fileNameRecordCount = 0;
+ protected long fileNameRecordCount = 0;
- /**
- * Constructor
- * @param state A task level state
- */
- public CompactionAvroJobConfigurator(State state) throws IOException {
+ public interface ConfiguratorFactory {
+ CompactionJobConfigurator createConfigurator(State state) throws
IOException;
+ }
+
+ public CompactionJobConfigurator(State state) throws IOException {
this.state = state;
this.fs = getFileSystem(state);
this.shouldDeduplicate =
state.getPropAsBoolean(MRCompactor.COMPACTION_SHOULD_DEDUPLICATE, true);
}
- /**
- * Refer to {@link MRCompactorAvroKeyDedupJobRunner#getDedupKeyOption()}
- */
- private MRCompactorAvroKeyDedupJobRunner.DedupKeyOption getDedupKeyOption() {
- if
(!this.state.contains(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_DEDUP_KEY))
{
- return MRCompactorAvroKeyDedupJobRunner.DEFAULT_DEDUP_KEY_OPTION;
+ public static CompactionJobConfigurator instantiateConfigurator(State state)
{
+ String compactionConfiguratorFactoryClass =
+ state.getProp(COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
DEFAULT_COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS);
+ try {
+ return Class.forName(compactionConfiguratorFactoryClass)
+ .asSubclass(ConfiguratorFactory.class)
+ .newInstance()
+ .createConfigurator(state);
+ } catch (ReflectiveOperationException | IOException e) {
+ throw new RuntimeException("Failed to instantiate a instance of job
configurator:", e);
}
- Optional<MRCompactorAvroKeyDedupJobRunner.DedupKeyOption> option =
Enums.getIfPresent(MRCompactorAvroKeyDedupJobRunner.DedupKeyOption.class,
-
this.state.getProp(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_DEDUP_KEY).toUpperCase());
- return option.isPresent() ? option.get() :
MRCompactorAvroKeyDedupJobRunner.DEFAULT_DEDUP_KEY_OPTION;
}
+ public abstract String getFileExtension();
+
/**
- * Refer to {@link MRCompactorAvroKeyDedupJobRunner#getKeySchema(Job,
Schema)}
+ * Customized MR job creation for Avro.
+ *
+ * @param dataset A path or directory which needs compaction
+ * @return A configured map-reduce job for avro compaction
*/
- private Schema getKeySchema(Job job, Schema topicSchema) throws IOException {
-
- boolean keySchemaFileSpecified =
this.state.contains(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC);
-
- Schema keySchema = null;
-
- MRCompactorAvroKeyDedupJobRunner.DedupKeyOption dedupKeyOption =
getDedupKeyOption();
- if (dedupKeyOption == MRCompactorAvroKeyDedupJobRunner.DedupKeyOption.ALL)
{
- log.info("Using all attributes in the schema (except Map, Arrar and Enum
fields) for compaction");
- keySchema = AvroUtils.removeUncomparableFields(topicSchema).get();
- } else if (dedupKeyOption ==
MRCompactorAvroKeyDedupJobRunner.DedupKeyOption.KEY) {
- log.info("Using key attributes in the schema for compaction");
- keySchema =
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
- } else if (keySchemaFileSpecified) {
- Path keySchemaFile = new
Path(state.getProp(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC));
- log.info("Using attributes specified in schema file " + keySchemaFile +
" for compaction");
- try {
- keySchema = AvroUtils.parseSchemaFromFile(keySchemaFile, this.fs);
- } catch (IOException e) {
- log.error("Failed to parse avro schema from " + keySchemaFile
- + ", using key attributes in the schema for compaction");
- keySchema =
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
- }
+ public Job createJob(FileSystemDataset dataset) throws IOException {
+ Configuration conf = HadoopUtils.getConfFromState(state);
- if (!MRCompactorAvroKeyDedupJobRunner.isKeySchemaValid(keySchema,
topicSchema)) {
- log.warn(String.format("Key schema %s is not compatible with record
schema %s.", keySchema, topicSchema)
- + "Using key attributes in the schema for compaction");
- keySchema =
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
- }
- } else {
- log.info("Property " +
MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC
- + " not provided. Using key attributes in the schema for
compaction");
- keySchema =
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
+ // Turn on mapreduce output compression by default
+ if (conf.get("mapreduce.output.fileoutputformat.compress") == null &&
conf.get("mapred.output.compress") == null) {
+ conf.setBoolean("mapreduce.output.fileoutputformat.compress", true);
}
- return keySchema;
- }
+ // Disable delegation token cancellation by default
+ if (conf.get("mapreduce.job.complete.cancel.delegation.tokens") == null) {
+ conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens",
false);
+ }
- private void configureSchema(Job job) throws IOException {
- Schema newestSchema =
MRCompactorAvroKeyDedupJobRunner.getNewestSchemaFromSource(job, this.fs);
- if (newestSchema != null) {
- if
(this.state.getPropAsBoolean(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_SINGLE_INPUT_SCHEMA,
true)) {
- AvroJob.setInputKeySchema(job, newestSchema);
- }
- AvroJob.setMapOutputKeySchema(job, this.shouldDeduplicate ?
getKeySchema(job, newestSchema) : newestSchema);
- AvroJob.setMapOutputValueSchema(job, newestSchema);
- AvroJob.setOutputKeySchema(job, newestSchema);
+ addJars(conf, this.state, fs);
+ Job job = Job.getInstance(conf);
+ job.setJobName(MRCompactorJobRunner.HADOOP_JOB_NAME);
+ boolean emptyDirectoryFlag = this.configureInputAndOutputPaths(job,
dataset);
+ if (emptyDirectoryFlag) {
+
this.state.setProp(HiveRegistrationPolicy.MAPREDUCE_JOB_INPUT_PATH_EMPTY_KEY,
true);
}
+ this.configureMapper(job);
+ this.configureReducer(job);
+ if (emptyDirectoryFlag || !this.shouldDeduplicate) {
+ job.setNumReduceTasks(0);
+ }
+ // Configure schema at the last step because FilesInputFormat will be used
internally
+ this.configureSchema(job);
+ this.isJobCreated = true;
+ this.configuredJob = job;
+ return job;
}
- protected void configureMapper(Job job) {
- job.setInputFormatClass(AvroKeyRecursiveCombineFileInputFormat.class);
- job.setMapperClass(AvroKeyMapper.class);
- job.setMapOutputKeyClass(AvroKey.class);
- job.setMapOutputValueClass(AvroValue.class);
- }
+ /**
+ * Configuring Mapper/Reducer's input/output schema for compaction MR job.
+ * The input schema for Mapper should be obtained from to-be-compacted file.
+ * The output schema for Mapper is for dedup.
+ * The output schema for Reducer should be identical to input schema of
Mapper.
+ * @param job The compaction jobConf.
+ * @throws IOException
+ */
+ protected abstract void configureSchema(Job job) throws IOException;
- protected void configureReducer(Job job) throws IOException {
- job.setOutputFormatClass(AvroKeyCompactorOutputFormat.class);
- job.setReducerClass(AvroKeyDedupReducer.class);
- job.setOutputKeyClass(AvroKey.class);
- job.setOutputValueClass(NullWritable.class);
- setNumberOfReducers(job);
+ /**
+ * Configuring Mapper class, specific to data format.
+ */
+ protected abstract void configureMapper(Job job);
+
+ /**
+ * Configuring Reducer class, specific to data format.
+ */
+ protected abstract void configureReducer(Job job) throws IOException;
+
+ protected FileSystem getFileSystem(State state) throws IOException {
+ Configuration conf = HadoopUtils.getConfFromState(state);
+ String uri = state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI,
ConfigurationKeys.LOCAL_FS_URI);
+ return FileSystem.get(URI.create(uri), conf);
}
/**
@@ -195,17 +196,19 @@ public class CompactionAvroJobConfigurator {
}
// get target file size
- long targetFileSize =
this.state.getPropAsLong(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_TARGET_OUTPUT_FILE_SIZE,
+ long targetFileSize =
+
this.state.getPropAsLong(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_TARGET_OUTPUT_FILE_SIZE,
MRCompactorAvroKeyDedupJobRunner.DEFAULT_COMPACTION_JOB_TARGET_OUTPUT_FILE_SIZE);
// get max reducers
int maxNumReducers =
state.getPropAsInt(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_MAX_NUM_REDUCERS,
-
MRCompactorAvroKeyDedupJobRunner.DEFAULT_COMPACTION_JOB_MAX_NUM_REDUCERS);
+
MRCompactorAvroKeyDedupJobRunner.DEFAULT_COMPACTION_JOB_MAX_NUM_REDUCERS);
int numReducers = Math.min(Ints.checkedCast(inputSize / targetFileSize) +
1, maxNumReducers);
// get use prime reducers
- boolean usePrimeReducers =
state.getPropAsBoolean(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_USE_PRIME_REDUCERS,
+ boolean usePrimeReducers =
+
state.getPropAsBoolean(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_USE_PRIME_REDUCERS,
MRCompactorAvroKeyDedupJobRunner.DEFAULT_COMPACTION_JOB_USE_PRIME_REDUCERS);
if (usePrimeReducers && numReducers != 1) {
@@ -214,24 +217,18 @@ public class CompactionAvroJobConfigurator {
job.setNumReduceTasks(numReducers);
}
- /**
- * Concatenate multiple directory or file names into one path
- *
- * @return Concatenated path or null if the parameter is empty
- */
- private Path concatPaths (String ...names) {
- if (names == null || names.length == 0) {
- return null;
+ protected void addJars(Configuration conf, State state, FileSystem fs)
throws IOException {
+ if (!state.contains(MRCompactor.COMPACTION_JARS)) {
+ return;
}
- Path cur = new Path(names[0]);
- for (int i = 1; i < names.length; ++i) {
- cur = new Path(cur, new Path(names[i]));
+ Path jarFileDir = new Path(state.getProp(MRCompactor.COMPACTION_JARS));
+ for (FileStatus status : fs.listStatus(jarFileDir)) {
+ DistributedCache.addFileToClassPath(status.getPath(), conf, fs);
}
- return cur;
}
/**
- * Refer to {@link
MRCompactorAvroKeyDedupJobRunner#configureInputAndOutputPaths(Job)}.
+ * Refer to
MRCompactorAvroKeyDedupJobRunner#configureInputAndOutputPaths(Job).
* @return false if no valid input paths present for MR job to process,
where a path is valid if it is
* a directory containing one or more files.
*
@@ -242,9 +239,9 @@ public class CompactionAvroJobConfigurator {
String mrOutputBase = this.state.getProp(MRCompactor.COMPACTION_JOB_DIR);
CompactionPathParser parser = new CompactionPathParser(this.state);
CompactionPathParser.CompactionParserResult rst = parser.parse(dataset);
- this.mrOutputPath = concatPaths (mrOutputBase, rst.getDatasetName(),
rst.getDstSubDir(), rst.getTimeString());
+ this.mrOutputPath = concatPaths(mrOutputBase, rst.getDatasetName(),
rst.getDstSubDir(), rst.getTimeString());
- log.info ("Cleaning temporary MR output directory: " + mrOutputPath);
+ log.info("Cleaning temporary MR output directory: " + mrOutputPath);
this.fs.delete(mrOutputPath, true);
this.mapReduceInputPaths = getGranularInputPaths(dataset.datasetRoot());
@@ -253,7 +250,7 @@ public class CompactionAvroJobConfigurator {
emptyDirectoryFlag = true;
}
- for (Path path: mapReduceInputPaths) {
+ for (Path path : mapReduceInputPaths) {
FileInputFormat.addInputPath(job, path);
}
@@ -262,61 +259,19 @@ public class CompactionAvroJobConfigurator {
}
/**
- * Customized MR job creation. This method will be used in
- * {@link
org.apache.gobblin.compaction.suite.CompactionAvroSuite#createJob(Dataset)}
+ * Concatenate multiple directory or file names into one path
*
- * @param dataset A path or directory which needs compaction
- * @return A configured map-reduce job for avro compaction
+ * @return Concatenated path or null if the parameter is empty
*/
- public Job createJob(FileSystemDataset dataset) throws IOException {
- Configuration conf = HadoopUtils.getConfFromState(state);
-
- // Turn on mapreduce output compression by default
- if (conf.get("mapreduce.output.fileoutputformat.compress") == null &&
conf.get("mapred.output.compress") == null) {
- conf.setBoolean("mapreduce.output.fileoutputformat.compress", true);
- }
-
- // Disable delegation token cancellation by default
- if (conf.get("mapreduce.job.complete.cancel.delegation.tokens") == null) {
- conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens",
false);
- }
-
- addJars(conf);
- Job job = Job.getInstance(conf);
- job.setJobName(MRCompactorJobRunner.HADOOP_JOB_NAME);
- boolean emptyDirectoryFlag = this.configureInputAndOutputPaths(job,
dataset);
- if (emptyDirectoryFlag) {
-
this.state.setProp(HiveRegistrationPolicy.MAPREDUCE_JOB_INPUT_PATH_EMPTY_KEY,
true);
- }
- this.configureMapper(job);
- this.configureReducer(job);
- if (emptyDirectoryFlag || !this.shouldDeduplicate) {
- job.setNumReduceTasks(0);
- }
- // Configure schema at the last step because FilesInputFormat will be used
internally
- this.configureSchema(job);
- this.isJobCreated = true;
- this.configuredJob = job;
- return job;
- }
-
- private void addJars(Configuration conf) throws IOException {
- if (!state.contains(MRCompactor.COMPACTION_JARS)) {
- return;
+ private Path concatPaths(String... names) {
+ if (names == null || names.length == 0) {
+ return null;
}
- Path jarFileDir = new Path(state.getProp(MRCompactor.COMPACTION_JARS));
- for (FileStatus status : this.fs.listStatus(jarFileDir)) {
- DistributedCache.addFileToClassPath(status.getPath(), conf, this.fs);
+ Path cur = new Path(names[0]);
+ for (int i = 1; i < names.length; ++i) {
+ cur = new Path(cur, new Path(names[i]));
}
- }
-
- private FileSystem getFileSystem(State state)
- throws IOException {
- Configuration conf = HadoopUtils.getConfFromState(state);
- String uri = state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI,
ConfigurationKeys.LOCAL_FS_URI);
- FileSystem fs = FileSystem.get(URI.create(uri), conf);
-
- return fs;
+ return cur;
}
/**
@@ -331,10 +286,10 @@ public class CompactionAvroJobConfigurator {
* @param path top level directory needs compaction
* @return A collection of input paths which will participate in map-reduce
job
*/
- protected Collection<Path> getGranularInputPaths (Path path) throws
IOException {
+ protected Collection<Path> getGranularInputPaths(Path path) throws
IOException {
boolean appendDelta =
this.state.getPropAsBoolean(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED,
- MRCompactor.DEFAULT_COMPACTION_RENAME_SOURCE_DIR_ENABLED);
+ MRCompactor.DEFAULT_COMPACTION_RENAME_SOURCE_DIR_ENABLED);
Set<Path> uncompacted = Sets.newHashSet();
Set<Path> total = Sets.newHashSet();
@@ -347,7 +302,7 @@ public class CompactionAvroJobConfigurator {
}
total.add(fileStatus.getPath().getParent());
} else {
- uncompacted.add(fileStatus.getPath().getParent());
+ uncompacted.add(fileStatus.getPath().getParent());
}
}
@@ -355,8 +310,8 @@ public class CompactionAvroJobConfigurator {
// When the output record count from mr counter doesn't match
// the record count from input file names, we prefer file names because
// it will be used to calculate the difference of count in next run.
- this.fileNameRecordCount = new
InputRecordCountHelper(this.state).calculateRecordCount (total);
- log.info ("{} has total input record count (based on file name) {}",
path, this.fileNameRecordCount);
+ this.fileNameRecordCount = new
InputRecordCountHelper(this.state).calculateRecordCount(total);
+ log.info("{} has total input record count (based on file name) {}",
path, this.fileNameRecordCount);
}
return uncompacted;
@@ -394,20 +349,23 @@ public class CompactionAvroJobConfigurator {
/**
* Get good files
* The problem happens when speculative task attempt initialized but then
killed in the middle of processing.
- * Some partial file was generated at
{tmp_output}/_temporary/1/_temporary/attempt_xxx_xxx/part-m-xxxx.avro,
- * without being committed to its final destination at
{tmp_output}/part-m-xxxx.avro.
+ * Some partial file was generated at
{tmp_output}/_temporary/1/_temporary/attempt_xxx_xxx/xxxx(Avro file
+ * might have .avro as extension file name), without being committed to its
final destination
+ * at {tmp_output}/xxxx.
*
* @param job Completed MR job
* @param fs File system that can handle file system
+ * @param acceptableExtension file extension acceptable as "good files".
* @return all successful files that has been committed
*/
- public static List<Path> getGoodFiles(Job job, Path tmpPath, FileSystem fs)
throws IOException {
- List<TaskCompletionEvent> failedEvents =
CompactionAvroJobConfigurator.getUnsuccessfulTaskCompletionEvent(job);
+ public static List<Path> getGoodFiles(Job job, Path tmpPath, FileSystem fs,
List<String> acceptableExtension)
+ throws IOException {
+ List<TaskCompletionEvent> failedEvents =
getUnsuccessfulTaskCompletionEvent(job);
- List<Path> allFilePaths = DatasetHelper.getApplicableFilePaths(fs,
tmpPath, Lists.newArrayList("avro"));
+ List<Path> allFilePaths = DatasetHelper.getApplicableFilePaths(fs,
tmpPath, acceptableExtension);
List<Path> goodPaths = new ArrayList<>();
for (Path filePath: allFilePaths) {
- if (CompactionAvroJobConfigurator.isFailedPath(filePath, failedEvents)) {
+ if (isFailedPath(filePath, failedEvents)) {
fs.delete(filePath, false);
log.error("{} is a bad path so it was deleted", filePath);
} else {
@@ -418,4 +376,3 @@ public class CompactionAvroJobConfigurator {
return goodPaths;
}
}
-
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionRunner.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionRunner.java
index 3976612..2d25dd8 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionRunner.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionRunner.java
@@ -45,8 +45,13 @@ import org.apache.gobblin.metrics.Tag;
* A class for launching a Gobblin MR job for compaction through command line.
*
* @author Lorand Bendig
+ * @deprecated Please use {@link
org.apache.gobblin.compaction.mapreduce.MRCompactionTask}
+ * and {@link org.apache.gobblin.compaction.source.CompactionSource} to
launch MR instead.
+ * The new way enjoys simpler logic to trigger the compaction flow and more
reliable verification criteria,
+ * instead of using timestamp only before.
*
*/
+@Deprecated
public class MRCompactionRunner {
private static final Logger LOG =
LoggerFactory.getLogger(MRCompactionRunner.class);
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
index 953c0dc..5a937ed 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
@@ -93,8 +93,13 @@ import static
org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner.Statu
* under {@link #COMPACTION_INPUT_DIR}.
*
* @author Ziyang Liu
+ * @deprecated Please use {@link
org.apache.gobblin.compaction.mapreduce.MRCompactionTask}
+ * and {@link org.apache.gobblin.compaction.source.CompactionSource} to
launch MR instead.
+ * The new way enjoys simpler logic to trigger the compaction flow and more
reliable verification criteria,
+ * instead of using timestamp only before.
*/
+@Deprecated
public class MRCompactor implements Compactor {
private static final Logger LOG = LoggerFactory.getLogger(MRCompactor.class);
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java
index 182244a..c6b5770 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java
@@ -50,6 +50,10 @@ import org.apache.gobblin.util.FileListUtils;
* compaction.topic, compaction.job.input.dir, compaction.job.dest.dir,
compaction.job.dest.dir.
*
* @author Ziyang Liu
+ * @deprecated Please use {@link
org.apache.gobblin.compaction.mapreduce.MRCompactionTask}
+ * and {@link org.apache.gobblin.compaction.source.CompactionSource} to
launch MR instead.
+ * The new way enjoys simpler logic to trigger the compaction flow and more
reliable verification criteria,
+ * instead of using timestamp only before.
*/
public class MRCompactorJobPropCreator {
private static final Logger LOG =
LoggerFactory.getLogger(MRCompactorJobPropCreator.class);
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
index 0ab3eab..d957dc5 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.compaction.mapreduce;
+import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
@@ -95,6 +96,10 @@ import static org.apache.gobblin.util.retry.RetryerFactory.*;
* the output directory.
*
* @author Ziyang Liu
+ * @deprecated Please use {@link
org.apache.gobblin.compaction.mapreduce.MRCompactionTask}
+ * and {@link org.apache.gobblin.compaction.source.CompactionSource} to
launch MR instead.
+ * The new way enjoys simpler logic to trigger the compaction flow and more
reliable verification criteria,
+ * instead of using timestamp only before.
*/
@SuppressWarnings("deprecation")
public abstract class MRCompactorJobRunner implements Runnable,
Comparable<MRCompactorJobRunner> {
@@ -322,7 +327,8 @@ public abstract class MRCompactorJobRunner implements
Runnable, Comparable<MRCom
this.submitAndWait(job);
if (shouldPublishData(compactionTimestamp)) {
// remove all invalid empty files due to speculative task execution
- List<Path> goodPaths =
CompactionAvroJobConfigurator.getGoodFiles(job, this.dataset.outputTmpPath(),
this.tmpFs);
+ List<Path> goodPaths = CompactionJobConfigurator.getGoodFiles(job,
this.dataset.outputTmpPath(), this.tmpFs,
+ ImmutableList.of("avro"));
if (!this.recompactAllData && this.recompactFromDestPaths) {
// append new files without deleting output directory
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
index 00b6742..35a3f82 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
@@ -17,6 +17,11 @@
package org.apache.gobblin.compaction.mapreduce.avro;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Enums;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -25,7 +30,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
-
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaCompatibility;
@@ -34,6 +38,9 @@ import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.commons.io.FilenameUtils;
+import org.apache.gobblin.compaction.dataset.Dataset;
+import org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner;
+import org.apache.gobblin.util.AvroUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -43,16 +50,6 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Enums;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import org.apache.gobblin.compaction.dataset.Dataset;
-import org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner;
-import org.apache.gobblin.util.AvroUtils;
-
/**
* A subclass of {@link
org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner} that configures
@@ -64,7 +61,6 @@ import org.apache.gobblin.util.AvroUtils;
* @author Ziyang Liu
*/
public class MRCompactorAvroKeyDedupJobRunner extends MRCompactorJobRunner {
-
private static final Logger LOG =
LoggerFactory.getLogger(MRCompactorAvroKeyDedupJobRunner.class);
private static final String COMPACTION_JOB_PREFIX = "compaction.job.";
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
index 4e4382b..dbe5256 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
@@ -403,14 +403,14 @@ public class CompactionSource implements
WorkUnitStreamSource<String, String> {
protected WorkUnit createWorkUnit (Dataset dataset) throws IOException {
WorkUnit workUnit = new WorkUnit();
TaskUtils.setTaskFactoryClass(workUnit, MRCompactionTaskFactory.class);
- suite.save (dataset, workUnit);
+ suite.save(dataset, workUnit);
return workUnit;
}
protected WorkUnit createWorkUnitForFailure (Dataset dataset) throws
IOException {
WorkUnit workUnit = new FailedTask.FailedWorkUnit();
TaskUtils.setTaskFactoryClass(workUnit,
CompactionFailedTask.CompactionFailedTaskFactory.class);
- suite.save (dataset, workUnit);
+ suite.save(dataset, workUnit);
return workUnit;
}
@@ -418,7 +418,7 @@ public class CompactionSource implements
WorkUnitStreamSource<String, String> {
WorkUnit workUnit = new FailedTask.FailedWorkUnit();
workUnit.setProp(CompactionVerifier.COMPACTION_VERIFICATION_FAIL_REASON,
reason);
TaskUtils.setTaskFactoryClass(workUnit,
CompactionFailedTask.CompactionFailedTaskFactory.class);
- suite.save (dataset, workUnit);
+ suite.save(dataset, workUnit);
return workUnit;
}
@@ -479,7 +479,7 @@ public class CompactionSource implements
WorkUnitStreamSource<String, String> {
LocalFileSystem lfs =
FileSystem.getLocal(HadoopUtils.getConfFromState(state));
Path tmpJarFileDir = new Path(this.tmpJobDir,
MRCompactor.COMPACTION_JAR_SUBDIR);
this.fs.mkdirs(tmpJarFileDir);
- state.setProp (MRCompactor.COMPACTION_JARS, tmpJarFileDir.toString());
+ state.setProp(MRCompactor.COMPACTION_JARS, tmpJarFileDir.toString());
// copy jar files to hdfs
for (String jarFile :
state.getPropAsList(ConfigurationKeys.JOB_JAR_FILES_KEY)) {
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
index 1c564a6..e9e3e50 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
@@ -41,6 +41,10 @@ import org.apache.gobblin.dataset.Dataset;
*
* The class also handles how to create a map-reduce job and how to serialized
and deserialize a {@link Dataset}
* to and from a {@link org.apache.gobblin.source.workunit.WorkUnit} properly.
+ *
+ * CompactionSuite should only be aware of verification methods and different
definition of datasets,
+ * but unaware of data format, which is handled by different implementation of
+ * {@link org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator}
*/
public interface CompactionSuite<D extends Dataset> {
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java
similarity index 84%
rename from
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java
rename to
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java
index 7b62671..a263e0c 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
+import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
@@ -31,7 +32,6 @@ import
org.apache.gobblin.compaction.action.CompactionCompleteAction;
import
org.apache.gobblin.compaction.action.CompactionCompleteFileOperationAction;
import org.apache.gobblin.compaction.action.CompactionHiveRegistrationAction;
import org.apache.gobblin.compaction.action.CompactionMarkDirectoryAction;
-import org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator;
import org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier;
import org.apache.gobblin.compaction.verify.CompactionThresholdVerifier;
import org.apache.gobblin.compaction.verify.CompactionTimeRangeVerifier;
@@ -40,18 +40,19 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.FileSystemDataset;
/**
- * A type of {@link CompactionSuite} which implements all components needed
for avro file compaction.
+ * A type of {@link CompactionSuite} which implements all components needed
for file compaction.
+ * The format-specific implementation is contained in the impl. of {@link
CompactionJobConfigurator}
*/
@Slf4j
-public class CompactionAvroSuite implements CompactionSuite<FileSystemDataset>
{
+public class CompactionSuiteBase implements CompactionSuite<FileSystemDataset>
{
public static final String SERIALIZE_COMPACTION_FILE_PATH_NAME =
"compaction-file-path-name";
private State state;
- private CompactionAvroJobConfigurator configurator = null;
+ private CompactionJobConfigurator configurator = null;
/**
* Constructor
*/
- public CompactionAvroSuite (State state) {
+ public CompactionSuiteBase(State state) {
this.state = state;
}
@@ -75,8 +76,7 @@ public class CompactionAvroSuite implements
CompactionSuite<FileSystemDataset> {
* {@link org.apache.gobblin.compaction.mapreduce.MRCompactionTask}
starts the map-reduce job
*/
public List<CompactionVerifier<FileSystemDataset>> getMapReduceVerifiers() {
- List<CompactionVerifier<FileSystemDataset>> list = new ArrayList<>();
- return list;
+ return new ArrayList<>();
}
/**
@@ -123,14 +123,14 @@ public class CompactionAvroSuite implements
CompactionSuite<FileSystemDataset> {
}
/**
- * Constructs a map-reduce job suitable for avro compaction. The detailed
configuration
- * work is delegated to {@link
CompactionAvroJobConfigurator#createJob(FileSystemDataset)}
+ * Constructs a map-reduce job suitable for compaction. The detailed
format-specific configuration
+ * work is delegated to {@link
CompactionJobConfigurator#createJob(FileSystemDataset)}
*
- * @param dataset a top level input path which contains all avro files
those need to be compacted
- * @return a map-reduce job which will compact avro files against {@link
org.apache.gobblin.dataset.Dataset}
+ * @param dataset a top level input path which contains all files those
need to be compacted
+ * @return a map-reduce job which will compact files against {@link
org.apache.gobblin.dataset.Dataset}
*/
public Job createJob (FileSystemDataset dataset) throws IOException {
- configurator = new CompactionAvroJobConfigurator(this.state);
+ configurator =
CompactionJobConfigurator.instantiateConfigurator(this.state);
return configurator.createJob(dataset);
}
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuiteFactory.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseFactory.java
similarity index 81%
rename from
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuiteFactory.java
rename to
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseFactory.java
index 70e2282..827fe28 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuiteFactory.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseFactory.java
@@ -21,11 +21,11 @@ import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.configuration.State;
/**
- * A {@link CompactionSuiteFactory} that handles {@link CompactionAvroSuite}
creation logic.
+ * A {@link CompactionSuiteFactory} that handles {@link CompactionSuiteBase}
creation logic.
*/
-@Alias("CompactionAvroSuiteFactory")
-public class CompactionAvroSuiteFactory implements CompactionSuiteFactory {
- public CompactionAvroSuite createSuite (State state) {
- return new CompactionAvroSuite (state);
+@Alias("CompactionSuiteBaseFactory")
+public class CompactionSuiteBaseFactory implements CompactionSuiteFactory {
+ public CompactionSuiteBase createSuite (State state) {
+ return new CompactionSuiteBase(state);
}
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteUtils.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteUtils.java
index ac599b5..e4066c6 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteUtils.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteUtils.java
@@ -21,6 +21,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.ClassAliasResolver;
+
/**
* A utility class for {@link CompactionSuite}
*/
@@ -28,13 +29,15 @@ public class CompactionSuiteUtils {
/**
* Return an {@link CompactionSuiteFactory} based on the configuration
- * @return A concrete suite factory instance. By default {@link
CompactionAvroSuiteFactory} is used.
+ * @return A concrete suite factory instance. By default {@link
CompactionSuiteBaseFactory} is used.
*/
- public static CompactionSuiteFactory getCompactionSuiteFactory (State state)
{
+ public static CompactionSuiteFactory getCompactionSuiteFactory(State state) {
try {
- String factoryName =
state.getProp(ConfigurationKeys.COMPACTION_SUITE_FACTORY,
ConfigurationKeys.DEFAULT_COMPACTION_SUITE_FACTORY);
+ String factoryName =
+ state.getProp(ConfigurationKeys.COMPACTION_SUITE_FACTORY,
ConfigurationKeys.DEFAULT_COMPACTION_SUITE_FACTORY);
- ClassAliasResolver<CompactionSuiteFactory> conditionClassAliasResolver =
new ClassAliasResolver<>(CompactionSuiteFactory.class);
+ ClassAliasResolver<CompactionSuiteFactory> conditionClassAliasResolver =
+ new ClassAliasResolver<>(CompactionSuiteFactory.class);
CompactionSuiteFactory factory =
conditionClassAliasResolver.resolveClass(factoryName).newInstance();
return factory;
} catch (IllegalAccessException | InstantiationException |
ClassNotFoundException e) {
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
index a2751b9..03ab36d 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
@@ -69,7 +69,7 @@ public class CompactionThresholdVerifier implements
CompactionVerifier<FileSyste
CompactionPathParser.CompactionParserResult result = new
CompactionPathParser(state).parse(dataset);
- double threshold =
RecompactionConditionBasedOnRatio.getRatioThresholdByDatasetName
(result.getDatasetName(), thresholdMap);
+ double threshold =
RecompactionConditionBasedOnRatio.getRatioThresholdByDatasetName(result.getDatasetName(),
thresholdMap);
log.debug ("Threshold is {} for dataset {}", threshold,
result.getDatasetName());
InputRecordCountHelper helper = new InputRecordCountHelper(state);
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
index 5b49193..6d9d8c4 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
@@ -17,30 +17,23 @@
package org.apache.gobblin.compaction.verify;
+import com.google.common.base.Splitter;
import java.util.List;
-import java.util.Map;
import java.util.regex.Pattern;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.Period;
-import org.joda.time.format.PeriodFormatter;
-import org.joda.time.format.PeriodFormatterBuilder;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Maps;
-
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.compaction.source.CompactionSource;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.FileSystemDataset;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Period;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
/**
* A simple class which verify current dataset belongs to a specific time
range. Will skip to do
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/suite/TestCompactionSuiteFactories.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/suite/TestCompactionSuiteFactories.java
index 7cfdb92..48890e2 100644
---
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/suite/TestCompactionSuiteFactories.java
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/suite/TestCompactionSuiteFactories.java
@@ -27,7 +27,7 @@ public class TestCompactionSuiteFactories {
* Test hive registration failure
*/
@Alias("HiveRegistrationFailureFactory")
- public static class HiveRegistrationFailureFactory extends
CompactionAvroSuiteFactory {
+ public static class HiveRegistrationFailureFactory implements
CompactionSuiteFactory {
public TestCompactionSuites.HiveRegistrationCompactionSuite createSuite
(State state) {
return new TestCompactionSuites.HiveRegistrationCompactionSuite(state);
}
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/suite/TestCompactionSuites.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/suite/TestCompactionSuites.java
index 87d41d2..59d762e 100644
---
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/suite/TestCompactionSuites.java
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/suite/TestCompactionSuites.java
@@ -32,7 +32,7 @@ public class TestCompactionSuites {
/**
* Test hive registration failure
*/
- public static class HiveRegistrationCompactionSuite extends
CompactionAvroSuite {
+ public static class HiveRegistrationCompactionSuite extends
CompactionSuiteBase {
public HiveRegistrationCompactionSuite(State state) {
super(state);
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index ccc94cb..533ba12 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -455,7 +455,7 @@ public class GobblinMultiTaskAttempt {
commit();
} else if (!isSpeculativeExecutionSafe()) {
throw new RuntimeException(
- "Specualtive execution is enabled. However, the task context is not
safe for speculative execution.");
+ "Speculative execution is enabled. However, the task context is not
safe for speculative execution.");
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
index 3a188a6..da335d8 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
@@ -74,7 +74,7 @@ public class TaskStateCollectorService extends
AbstractScheduledService {
private final Path outputTaskStateDir;
/**
- * Add a cloesable action to run after each existence-checking of task state
file.
+ * Add a closeable action to run after each existence-checking of task state
file.
* A typical example to plug here is hive registration:
* We do hive registration everytime there are available taskStates
deserialized from storage, on the driver level.
*/
@@ -204,7 +204,7 @@ public class TaskStateCollectorService extends
AbstractScheduledService {
this.jobState.addTaskState(taskState);
}
- // Finish any addtional steps defined in handler on driver level.
+ // Finish any additional steps defined in handler on driver level.
// Currently implemented handler for Hive registration only.
if (optionalTaskCollectorHandler.isPresent()) {
LOGGER.info("Execute Pipelined TaskStateCollectorService Handler for " +
taskStateQueue.size() + " tasks");