Repository: incubator-gobblin Updated Branches: refs/heads/master afdba01f9 -> 2485282d2
[GOBBLIN-244] Need additional info for gobblin tracking hourly-deduped Closes #2094 from yukuai518/compaction Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/2485282d Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/2485282d Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/2485282d Branch: refs/heads/master Commit: 2485282d2b1331b42da7dcd9ab6f1f50306fe3ce Parents: afdba01 Author: Kuai Yu <[email protected]> Authored: Mon Sep 11 10:13:31 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Sep 11 10:13:31 2017 -0700 ---------------------------------------------------------------------- .../CompactionCompleteFileOperationAction.java | 18 ++-- .../event/CompactionSlaEventHelper.java | 2 + .../verify/CompactionThresholdVerifier.java | 2 +- .../verify/InputRecordCountHelper.java | 97 +++++++++++++++++--- 4 files changed, 99 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2485282d/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java index 713fd32..ceddb0d 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java @@ -87,8 +87,8 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete // We are not getting record count from map-reduce counter because in next run, the threshold (delta record) // calculation is based on the input file names. long newTotalRecords = 0; - long oldTotalRecords = InputRecordCountHelper.readRecordCount (helper.getFs(), new Path (result.getDstAbsoluteDir())); - + long oldTotalRecords = helper.readRecordCount(new Path (result.getDstAbsoluteDir())); + long executeCount = helper.readExecutionCount (new Path (result.getDstAbsoluteDir())); if (appendDeltaOutput) { FsPermission permission = HadoopUtils.deserializeFsPermission(this.state, MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION, @@ -126,15 +126,21 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete newTotalRecords = counter.getValue(); } + State compactState = helper.loadState(new Path (result.getDstAbsoluteDir())); + compactState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords)); + compactState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1)); + helper.saveState(new Path (result.getDstAbsoluteDir()), compactState); + + log.info("Updating record count from {} to {} in {} [{}]", oldTotalRecords, newTotalRecords, dstPath, executeCount + 1); + // submit events for record count if (eventSubmitter != null) { Map<String, String> eventMetadataMap = ImmutableMap.of(CompactionSlaEventHelper.DATASET_URN, dataset.datasetURN(), - CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords)); + CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords), + CompactionSlaEventHelper.PREV_RECORD_COUNT_TOTAL, Long.toString(oldTotalRecords), + CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1)); this.eventSubmitter.submit(CompactionSlaEventHelper.COMPACTION_RECORD_COUNT_EVENT, eventMetadataMap); } - - InputRecordCountHelper.writeRecordCount (helper.getFs(), new Path (result.getDstAbsoluteDir()), newTotalRecords); - log.info("Updating record count from {} to {} in {} ", oldTotalRecords, newTotalRecords, dstPath); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2485282d/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java index b0c4dcb..5d89fd8 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java @@ -51,6 +51,8 @@ public class CompactionSlaEventHelper { public static final String LATE_RECORD_COUNT = "lateRecordCount"; public static final String REGULAR_RECORD_COUNT = "regularRecordCount"; public static final String NEED_RECOMPACT = "needRecompact"; + public static final String PREV_RECORD_COUNT_TOTAL = "prevRecordCountTotal"; + public static final String EXEC_COUNT_TOTAL = "executionCountTotal"; public static final String RECORD_COUNT_TOTAL = "recordCountTotal"; public static final String HIVE_REGISTRATION_PATHS = "hiveRegistrationPaths"; public static final String RENAME_DIR_PATHS = "renameDirPaths"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2485282d/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java index 27bc6f0..fbd6413 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java @@ -73,7 +73,7 @@ public class CompactionThresholdVerifier implements CompactionVerifier<FileSyste InputRecordCountHelper helper = new InputRecordCountHelper(state); try { double newRecords = helper.calculateRecordCount (Lists.newArrayList(new Path(dataset.datasetURN()))); - double oldRecords = InputRecordCountHelper.readRecordCount (helper.getFs(), new Path(result.getDstAbsoluteDir())); + double oldRecords = helper.readRecordCount (new Path(result.getDstAbsoluteDir())); log.info ("Dataset {} : previous records {}, current records {}", dataset.datasetURN(), oldRecords, newRecords); if (oldRecords == 0) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2485282d/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java index 661fa47..de95255 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java @@ -19,6 +19,7 @@ package org.apache.gobblin.compaction.verify; import com.google.common.base.Charsets; import com.google.common.collect.Lists; import org.apache.gobblin.compaction.dataset.DatasetHelper; +import org.apache.gobblin.compaction.event.CompactionSlaEventHelper; import org.apache.gobblin.compaction.mapreduce.MRCompactor; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; @@ -28,11 +29,12 @@ import org.apache.gobblin.util.recordcount.IngestionRecordCountProvider; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.BufferedReader; +import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; @@ -54,8 +56,11 @@ public class InputRecordCountHelper { private final RecordCountProvider inputRecordCountProvider; private final String AVRO = "avro"; + @Deprecated public final static String RECORD_COUNT_FILE = "_record_count"; + public final static String STATE_FILE = "_state_file"; + /** * Constructor */ @@ -86,33 +91,99 @@ public class InputRecordCountHelper { } /** + * Load compaction state file + */ + public State loadState (Path dir) throws IOException { + return loadState(this.fs, dir); + } + + private static State loadState (FileSystem fs, Path dir) throws IOException { + State state = new State(); + if (fs.exists(new Path(dir, STATE_FILE))) { + try (FSDataInputStream inputStream = fs.open(new Path(dir, STATE_FILE))) { + state.readFields(inputStream); + } + } + return state; + } + + /** + * Save compaction state file + */ + public void saveState (Path dir, State state) throws IOException { + saveState(this.fs, dir, state); + } + + private static void saveState (FileSystem fs, Path dir, State state) throws IOException { + Path tmpFile = new Path(dir, STATE_FILE + ".tmp"); + Path newFile = new Path(dir, STATE_FILE); + fs.delete(tmpFile, false); + try (DataOutputStream dataOutputStream = new DataOutputStream(fs.create(new Path(dir, STATE_FILE + ".tmp")))) { + state.write(dataOutputStream); + } + + // Caution: We are deleting right before renaming because rename doesn't support atomic overwrite options from FileSystem API. + fs.delete(newFile, false); + fs.rename(tmpFile, newFile); + } + + /** * Read record count from a specific directory. - * File name is {@link InputRecordCountHelper#RECORD_COUNT_FILE} + * File name is {@link InputRecordCountHelper#STATE_FILE} + * @param dir directory where a state file is located + * @return record count + */ + public long readRecordCount (Path dir) throws IOException { + return readRecordCount(this.fs, dir); + } + + /** + * Read record count from a specific directory. + * File name is {@link InputRecordCountHelper#STATE_FILE} * @param fs file system in use - * @param dir directory where a record file will be read + * @param dir directory where a state file is located * @return record count */ + @Deprecated public static long readRecordCount (FileSystem fs, Path dir) throws IOException { - if (!fs.exists(new Path(dir, RECORD_COUNT_FILE))) { - return 0; - } + State state = loadState(fs, dir); - try (BufferedReader br = new BufferedReader(new InputStreamReader(fs.open (new Path (dir, RECORD_COUNT_FILE)), Charsets.UTF_8))) { - long count = Long.parseLong(br.readLine()); - return count; + if (!state.contains(CompactionSlaEventHelper.RECORD_COUNT_TOTAL)) { + if (fs.exists(new Path (dir, RECORD_COUNT_FILE))){ + try (BufferedReader br = new BufferedReader(new InputStreamReader(fs.open (new Path (dir, RECORD_COUNT_FILE)), Charsets.UTF_8))) { + long count = Long.parseLong(br.readLine()); + return count; + } + } else { + return 0; + } + } else { + return Long.parseLong(state.getProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL)); } } /** + * Read execution count from a specific directory. + * File name is {@link InputRecordCountHelper#STATE_FILE} + * @param dir directory where a state file is located + * @return record count + */ + public long readExecutionCount (Path dir) throws IOException { + State state = loadState(fs, dir); + return Long.parseLong(state.getProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, "0")); + } + + /** * Write record count to a specific directory. * File name is {@link InputRecordCountHelper#RECORD_COUNT_FILE} * @param fs file system in use - * @param dir directory where a record file will be saved + * @param dir directory where a record file is located */ + @Deprecated public static void writeRecordCount (FileSystem fs, Path dir, long count) throws IOException { - try (FSDataOutputStream outputFileStream = fs.create(new Path(dir, RECORD_COUNT_FILE))) { - outputFileStream.writeBytes(Long.toString(count)); - } + State state = loadState(fs, dir); + state.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, count); + saveState(fs, dir, state); } protected FileSystem getSourceFileSystem (State state)
