Repository: incubator-gobblin Updated Branches: refs/heads/master a838b4d6d -> d969f937d
[GOBBLIN-445] Exclude staging files from MR speculative execution in compaction flow Exclude staging files from MR speculative execution in compaction flow Add old logic back Fix some documentation Closes #2320 from yukuai518/zero2 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d969f937 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d969f937 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d969f937 Branch: refs/heads/master Commit: d969f937d2c746b93b25d6be1f879a533f6601c5 Parents: a838b4d Author: Kuai Yu <[email protected]> Authored: Mon Mar 26 15:38:17 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Mar 26 15:38:17 2018 -0700 ---------------------------------------------------------------------- .../action/CompactionCompleteFileOperationAction.java | 2 +- .../compaction/mapreduce/CompactionAvroJobConfigurator.java | 9 ++++----- .../gobblin/compaction/mapreduce/MRCompactorJobRunner.java | 2 +- .../java/org/apache/gobblin/scheduler/JobScheduler.java | 3 ++- 4 files changed, 8 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d969f937/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java index ce536a1..a21ca93 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java @@ -92,7 +92,7 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete long oldTotalRecords = helper.readRecordCount(new Path (result.getDstAbsoluteDir())); long executeCount = helper.readExecutionCount (new Path (result.getDstAbsoluteDir())); - List<Path> goodPaths = CompactionAvroJobConfigurator.removeFailedPaths(job, tmpPath, this.fs); + List<Path> goodPaths = CompactionAvroJobConfigurator.getGoodFiles(job, tmpPath, this.fs); if (appendDeltaOutput) { FsPermission permission = HadoopUtils.deserializeFsPermission(this.state, http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d969f937/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java index 1108e74..b8f407f 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java @@ -41,7 +41,6 @@ import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapred.AvroValue; import org.apache.avro.mapreduce.AvroJob; import org.apache.commons.math3.primes.Primes; -import org.apache.gobblin.writer.WriterOutputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileStatus; @@ -367,21 +366,21 @@ public class CompactionAvroJobConfigurator { } private static boolean isFailedPath(Path path, List<TaskCompletionEvent> failedEvents) { - return failedEvents.stream() + return path.toString().contains("_temporary") || failedEvents.stream() .anyMatch(event -> path.toString().contains(Path.SEPARATOR + event.getTaskAttemptId().toString() + Path.SEPARATOR)); } /** - * Remove all bad paths caused by speculative execution + * Get good files * The problem happens when speculative task attempt initialized but then killed in the middle of processing. * Some partial file was generated at {tmp_output}/_temporary/1/_temporary/attempt_xxx_xxx/part-m-xxxx.avro, * without being committed to its final destination at {tmp_output}/part-m-xxxx.avro. * * @param job Completed MR job * @param fs File system that can handle file system - * @return all successful paths + * @return all successful files that has been committed */ - public static List<Path> removeFailedPaths(Job job, Path tmpPath, FileSystem fs) throws IOException { + public static List<Path> getGoodFiles(Job job, Path tmpPath, FileSystem fs) throws IOException { List<TaskCompletionEvent> failedEvents = CompactionAvroJobConfigurator.getUnsuccessfulTaskCompletionEvent(job); List<Path> allFilePaths = DatasetHelper.getApplicableFilePaths(fs, tmpPath, Lists.newArrayList("avro")); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d969f937/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java index 32a8e0b..8a0599e 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java @@ -325,7 +325,7 @@ public abstract class MRCompactorJobRunner implements Runnable, Comparable<MRCom this.submitAndWait(job); if (shouldPublishData(compactionTimestamp)) { // remove all invalid empty files due to speculative task execution - List<Path> goodPaths = CompactionAvroJobConfigurator.removeFailedPaths(job, this.dataset.outputTmpPath(), this.tmpFs); + List<Path> goodPaths = CompactionAvroJobConfigurator.getGoodFiles(job, this.dataset.outputTmpPath(), this.tmpFs); if (!this.recompactAllData && this.recompactFromDestPaths) { // append new files without deleting output directory http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d969f937/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java index f2e254d..c737431 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java @@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.commons.configuration.ConfigurationException; +import org.apache.gobblin.source.Source; import org.apache.hadoop.fs.Path; import org.quartz.CronScheduleBuilder; @@ -457,7 +458,7 @@ public class JobScheduler extends AbstractIdleService { * @param jobProps Job configuration properties * @param jobListener {@link JobListener} used for callback, can be <em>null</em> if no callback is needed. * @param jobLauncher a {@link JobLauncher} object used to launch the job to run - * @return If current job needs retriggering + * @return If current job is a stop-early job based on {@link Source#isEarlyStopped()} * @throws JobException when there is anything wrong with running the job */ public boolean runJob(Properties jobProps, JobListener jobListener, JobLauncher jobLauncher)
