Repository: incubator-gobblin Updated Branches: refs/heads/master 6f9acc073 -> b6e88fd06
[GOBBLIN-303] Remove empty avro files during compaction Closes #2158 from yukuai518/empty Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b6e88fd0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b6e88fd0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b6e88fd0 Branch: refs/heads/master Commit: b6e88fd062cd855a1d4499c0a5787bbf2d1e5183 Parents: 6f9acc0 Author: Kuai Yu <[email protected]> Authored: Thu Nov 2 19:56:53 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Thu Nov 2 19:56:53 2017 -0700 ---------------------------------------------------------------------- .../CompactionCompleteFileOperationAction.java | 39 ++++++++---- .../event/CompactionSlaEventHelper.java | 1 + .../CompactionAvroJobConfigurator.java | 66 ++++++++++++++++++++ .../mapreduce/MRCompactorJobRunner.java | 14 +++-- 4 files changed, 103 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b6e88fd0/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java index ceddb0d..ce536a1 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java @@ -40,10 +40,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskCompletionEvent; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** @@ -83,30 +86,36 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete boolean appendDeltaOutput = this.state.getPropAsBoolean(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED, MRCompactor.DEFAULT_COMPACTION_RENAME_SOURCE_DIR_ENABLED); - // Obtain record count from input file names - // We are not getting record count from map-reduce counter because in next run, the threshold (delta record) - // calculation is based on the input file names. + Job job = this.configurator.getConfiguredJob(); + long newTotalRecords = 0; long oldTotalRecords = helper.readRecordCount(new Path (result.getDstAbsoluteDir())); long executeCount = helper.readExecutionCount (new Path (result.getDstAbsoluteDir())); + + List<Path> goodPaths = CompactionAvroJobConfigurator.removeFailedPaths(job, tmpPath, this.fs); + if (appendDeltaOutput) { FsPermission permission = HadoopUtils.deserializeFsPermission(this.state, MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION, FsPermission.getDefault()); WriterUtils.mkdirsWithRecursivePermission(this.fs, dstPath, permission); // append files under mr output to destination - List<Path> paths = DatasetHelper.getApplicableFilePaths(fs, tmpPath, Lists.newArrayList("avro")); - for (Path path: paths) { - String fileName = path.getName(); - log.info(String.format("Adding %s to %s", path.toString(), dstPath)); + for (Path filePath: goodPaths) { + String fileName = filePath.getName(); + log.info(String.format("Adding %s to %s", filePath.toString(), dstPath)); Path outPath = new Path (dstPath, fileName); - if (!this.fs.rename(path, outPath)) { + if (!this.fs.rename(filePath, outPath)) { throw new IOException( - String.format("Unable to move %s to %s", path.toString(), outPath.toString())); + String.format("Unable to move %s to %s", filePath.toString(), outPath.toString())); } } + // Obtain record count from input file names. + // We don't get record count from map-reduce counter because in the next run, the threshold (delta record) + // calculation is based on the input file names. By pre-defining which input folders are involved in the + // MR execution, it is easy to track how many files are involved in MR so far, thus calculating the number of total records + // (all previous run + current run) is possible. newTotalRecords = this.configurator.getFileNameRecordCount(); } else { this.fs.delete(dstPath, true); @@ -120,8 +129,10 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete String.format("Unable to move %s to %s", tmpPath, dstPath)); } - // get record count from map reduce job counter - Job job = this.configurator.getConfiguredJob(); + // Obtain record count from map reduce job counter + // We don't get record count from file name because tracking which files are actually involved in the MR execution can + // be hard. This is due to new minutely data is rolled up to hourly folder but from daily compaction perspective we are not + // able to tell which file are newly added (because we simply pass all hourly folders to MR job instead of individual files). Counter counter = job.getCounters().findCounter(AvroKeyMapper.EVENT_COUNTER.RECORD_COUNT); newTotalRecords = counter.getValue(); } @@ -129,6 +140,7 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete State compactState = helper.loadState(new Path (result.getDstAbsoluteDir())); compactState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords)); compactState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1)); + compactState.setProp(CompactionSlaEventHelper.MR_JOB_ID, this.configurator.getConfiguredJob().getJobID().toString()); helper.saveState(new Path (result.getDstAbsoluteDir()), compactState); log.info("Updating record count from {} to {} in {} [{}]", oldTotalRecords, newTotalRecords, dstPath, executeCount + 1); @@ -138,12 +150,15 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete Map<String, String> eventMetadataMap = ImmutableMap.of(CompactionSlaEventHelper.DATASET_URN, dataset.datasetURN(), CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords), CompactionSlaEventHelper.PREV_RECORD_COUNT_TOTAL, Long.toString(oldTotalRecords), - CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1)); + CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1), + CompactionSlaEventHelper.MR_JOB_ID, this.configurator.getConfiguredJob().getJobID().toString()); this.eventSubmitter.submit(CompactionSlaEventHelper.COMPACTION_RECORD_COUNT_EVENT, eventMetadataMap); } } } + + public void addEventSubmitter(EventSubmitter eventSubmitter) { this.eventSubmitter = eventSubmitter; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b6e88fd0/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java index 5d89fd8..042c5a4 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java @@ -53,6 +53,7 @@ public class CompactionSlaEventHelper { public static final String NEED_RECOMPACT = "needRecompact"; public static final String PREV_RECORD_COUNT_TOTAL = "prevRecordCountTotal"; public static final String EXEC_COUNT_TOTAL = "executionCountTotal"; + public static final String MR_JOB_ID = "mrJobId"; public static final String RECORD_COUNT_TOTAL = "recordCountTotal"; public static final String HIVE_REGISTRATION_PATHS = "hiveRegistrationPaths"; public static final String RENAME_DIR_PATHS = "renameDirPaths"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b6e88fd0/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java index 9c1e362..1108e74 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java @@ -19,8 +19,11 @@ package org.apache.gobblin.compaction.mapreduce; import com.google.common.base.Enums; import com.google.common.base.Optional; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; + +import org.apache.gobblin.compaction.dataset.DatasetHelper; import org.apache.gobblin.compaction.mapreduce.avro.*; import org.apache.gobblin.compaction.parser.CompactionPathParser; import org.apache.gobblin.compaction.verify.InputRecordCountHelper; @@ -38,6 +41,7 @@ import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapred.AvroValue; import org.apache.avro.mapreduce.AvroJob; import org.apache.commons.math3.primes.Primes; +import org.apache.gobblin.writer.WriterOutputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileStatus; @@ -45,13 +49,19 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskCompletionEvent; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.LinkedList; +import java.util.List; import java.util.Set; +import java.util.stream.Collectors; /** * A configurator that focused on creating avro compaction map-reduce job @@ -331,5 +341,61 @@ public class CompactionAvroJobConfigurator { return uncompacted; } + + private static List<TaskCompletionEvent> getAllTaskCompletionEvent(Job completedJob) { + List<TaskCompletionEvent> completionEvents = new LinkedList<>(); + + while (true) { + try { + TaskCompletionEvent[] bunchOfEvents; + bunchOfEvents = completedJob.getTaskCompletionEvents(completionEvents.size()); + if (bunchOfEvents == null || bunchOfEvents.length == 0) { + break; + } + completionEvents.addAll(Arrays.asList(bunchOfEvents)); + } catch (IOException e) { + break; + } + } + + return completionEvents; + } + + private static List<TaskCompletionEvent> getUnsuccessfulTaskCompletionEvent(Job completedJob) { + return getAllTaskCompletionEvent(completedJob).stream().filter(te->te.getStatus() != TaskCompletionEvent.Status.SUCCEEDED).collect( + Collectors.toList()); + } + + private static boolean isFailedPath(Path path, List<TaskCompletionEvent> failedEvents) { + return failedEvents.stream() + .anyMatch(event -> path.toString().contains(Path.SEPARATOR + event.getTaskAttemptId().toString() + Path.SEPARATOR)); + } + + /** + * Remove all bad paths caused by speculative execution + * The problem happens when speculative task attempt initialized but then killed in the middle of processing. + * Some partial file was generated at {tmp_output}/_temporary/1/_temporary/attempt_xxx_xxx/part-m-xxxx.avro, + * without being committed to its final destination at {tmp_output}/part-m-xxxx.avro. + * + * @param job Completed MR job + * @param fs File system that can handle file system + * @return all successful paths + */ + public static List<Path> removeFailedPaths(Job job, Path tmpPath, FileSystem fs) throws IOException { + List<TaskCompletionEvent> failedEvents = CompactionAvroJobConfigurator.getUnsuccessfulTaskCompletionEvent(job); + + List<Path> allFilePaths = DatasetHelper.getApplicableFilePaths(fs, tmpPath, Lists.newArrayList("avro")); + List<Path> goodPaths = new ArrayList<>(); + for (Path filePath: allFilePaths) { + if (CompactionAvroJobConfigurator.isFailedPath(filePath, failedEvents)) { + fs.delete(filePath, false); + log.error("{} is a bad path so it was deleted", filePath); + } else { + goodPaths.add(filePath); + } + } + + return goodPaths; + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b6e88fd0/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java index 0f3592c..32a8e0b 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java @@ -19,6 +19,7 @@ package org.apache.gobblin.compaction.mapreduce; import java.io.IOException; import java.net.URI; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -28,6 +29,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.commons.io.FilenameUtils; import org.apache.commons.math3.primes.Primes; @@ -40,6 +42,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskCompletionEvent; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.joda.time.DateTime; @@ -321,9 +324,12 @@ public abstract class MRCompactorJobRunner implements Runnable, Comparable<MRCom this.configureJob(job); this.submitAndWait(job); if (shouldPublishData(compactionTimestamp)) { + // remove all invalid empty files due to speculative task execution + List<Path> goodPaths = CompactionAvroJobConfigurator.removeFailedPaths(job, this.dataset.outputTmpPath(), this.tmpFs); + if (!this.recompactAllData && this.recompactFromDestPaths) { // append new files without deleting output directory - addFilesInTmpPathToOutputPath(); + addGoodFilesToOutputPath(goodPaths); // clean up late data from outputLateDirectory, which has been set to inputPath deleteFilesByPaths(this.dataset.inputPaths()); } else { @@ -352,7 +358,6 @@ public abstract class MRCompactorJobRunner implements Runnable, Comparable<MRCom } } - /** * For regular compactions, compaction timestamp is the time the compaction job starts. * @@ -603,9 +608,8 @@ public abstract class MRCompactorJobRunner implements Runnable, Comparable<MRCom HadoopUtils.movePath (MRCompactorJobRunner.this.tmpFs, this.dataset.outputTmpPath(), FileSystem.get(this.dataset.outputPath().getParent().toUri(), this.fs.getConf()), this.dataset.outputPath(), false, this.fs.getConf()) ; } - private void addFilesInTmpPathToOutputPath () throws IOException { - List<Path> paths = this.getApplicableFilePaths(this.dataset.outputTmpPath(), this.tmpFs); - for (Path path: paths) { + private void addGoodFilesToOutputPath (List<Path> goodPaths) throws IOException { + for (Path path: goodPaths) { String fileName = path.getName(); LOG.info(String.format("Adding %s to %s", path.toString(), this.dataset.outputPath())); Path outPath = MRCompactorJobRunner.this.lateOutputRecordCountProvider.constructLateFilePath(fileName,
