This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 099e9db [GOBBLIN-1231][GOBBLIN-1217][GOBBLIN-1223] Make re-compaction
be able to write to a new folder based on the executCount
099e9db is described below
commit 099e9dbee62b1a9694f59e5b4d1ff1b6b46faa9b
Author: Zihan Li <[email protected]>
AuthorDate: Tue Aug 18 10:57:46 2020 -0700
[GOBBLIN-1231][GOBBLIN-1217][GOBBLIN-1223] Make re-compaction be able to
write to a new folder based on the executCount
Closes #3079 from ZihanLi58/GOBBLIN-1231
---
.../gobblin/configuration/ConfigurationKeys.java | 6 +++
.../CompactionCompleteFileOperationAction.java | 59 +++++++++++++---------
.../action/CompactionHiveRegistrationAction.java | 15 +++++-
.../mapreduce/AvroCompactionTaskTest.java | 36 ++++++++++++-
4 files changed, 88 insertions(+), 28 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 8524f1f..a6237cf 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -994,6 +994,12 @@ public class ConfigurationKeys {
public static final String COMPACTION_PRIORITIZER_ALIAS =
COMPACTION_PRIORITIZATION_PREFIX + "prioritizerAlias";
public static final String COMPACTION_ESTIMATOR =
COMPACTION_PRIORITIZATION_PREFIX + "estimator";
+ /***
+ * Configuration properties related to Re-compaction
+ */
+ public static String RECOMPACTION_WRITE_TO_NEW_FOLDER =
"recompaction.write.to.new.folder";
+
+
/**
* Configuration related to ConfigStore based copy/retention
*/
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index 72465b4..02ac90e 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -20,15 +20,11 @@ package org.apache.gobblin.compaction.action;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
-import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.compaction.dataset.DatasetHelper;
import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
@@ -38,11 +34,13 @@ import
org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.compaction.source.CompactionSource;
import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.WriterUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -59,6 +57,7 @@ import org.apache.hadoop.mapreduce.Job;
@AllArgsConstructor
public class CompactionCompleteFileOperationAction implements
CompactionCompleteAction<FileSystemDataset> {
+ public final static String COMPACTION_DIRECTORY_FORMAT = "/compaction_%s";
protected WorkUnitState state;
private CompactionJobConfigurator configurator;
private InputRecordCountHelper helper;
@@ -97,7 +96,7 @@ public class CompactionCompleteFileOperationAction implements
CompactionComplete
long newTotalRecords = 0;
long oldTotalRecords = helper.readRecordCount(new
Path(result.getDstAbsoluteDir()));
- long executeCount = helper.readExecutionCount(new
Path(result.getDstAbsoluteDir()));
+ long executionCount = helper.readExecutionCount(new
Path(result.getDstAbsoluteDir()));
List<Path> goodPaths = CompactionJobConfigurator.getGoodFiles(job,
tmpPath, this.fs,
ImmutableList.of(configurator.getFileExtension()));
@@ -125,9 +124,15 @@ public class CompactionCompleteFileOperationAction
implements CompactionComplete
// (all previous run + current run) is possible.
newTotalRecords = this.configurator.getFileNameRecordCount();
} else {
- this.configurator.getOldFiles()
- .add(this.fs.makeQualified(dstPath).toString());
- this.fs.delete(dstPath, true);
+ if
(state.getPropAsBoolean(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER,
false)) {
+ Path oldFilePath = PathUtils.mergePaths(dstPath, new
Path(String.format(COMPACTION_DIRECTORY_FORMAT, executionCount)));
+ dstPath = PathUtils.mergePaths(dstPath, new
Path(String.format(COMPACTION_DIRECTORY_FORMAT, executionCount + 1)));
+
this.configurator.getOldFiles().add(this.fs.makeQualified(oldFilePath).toString());
+ //Write to a new path, no need to delete the old path
+ } else {
+
this.configurator.getOldFiles().add(this.fs.makeQualified(dstPath).toString());
+ this.fs.delete(dstPath, true);
+ }
FsPermission permission =
HadoopUtils.deserializeFsPermission(this.state,
MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
FsPermission.getDefault());
@@ -144,31 +149,35 @@ public class CompactionCompleteFileOperationAction
implements CompactionComplete
Counter counter =
job.getCounters().findCounter(RecordKeyMapperBase.EVENT_COUNTER.RECORD_COUNT);
newTotalRecords = counter.getValue();
}
+ final Path finalDstPath = dstPath;
goodPaths.stream().forEach(p -> {
String fileName = p.getName();
- outputFiles.add(new Path(dstPath, fileName));
+ outputFiles.add(new Path(finalDstPath, fileName));
});
this.configurator.setDstNewFiles(outputFiles);
- State compactState = helper.loadState(new
Path(result.getDstAbsoluteDir()));
- if(executeCount!=0) {
- compactState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL +
Long.toString(executeCount), Long.toString(helper.readRecordCount(new
Path(result.getDstAbsoluteDir()))));
- compactState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL +
Long.toString(executeCount), Long.toString(executeCount));
- compactState.setProp("DuplicateRecordCount" +
Long.toString(executeCount), compactState.getProp("DuplicateRecordCount",
"null"));
+ State compactionState = helper.loadState(new
Path(result.getDstAbsoluteDir()));
+ if (executionCount != 0) {
+ compactionState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL +
Long.toString(executionCount),
+ Long.toString(helper.readRecordCount(new
Path(result.getDstAbsoluteDir()))));
+ compactionState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL +
Long.toString(executionCount),
+ Long.toString(executionCount));
+ compactionState.setProp("DuplicateRecordCount" +
Long.toString(executionCount),
+ compactionState.getProp("DuplicateRecordCount", "null"));
}
- compactState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL,
Long.toString(newTotalRecords));
- compactState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL,
Long.toString(executeCount + 1));
- compactState.setProp(CompactionSlaEventHelper.MR_JOB_ID,
+ compactionState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL,
Long.toString(newTotalRecords));
+ compactionState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL,
Long.toString(executionCount + 1));
+ compactionState.setProp(CompactionSlaEventHelper.MR_JOB_ID,
this.configurator.getConfiguredJob().getJobID().toString());
- compactState.setProp("DuplicateRecordCount",
job.getCounters().findCounter(
- RecordKeyDedupReducerBase.EVENT_COUNTER.DEDUPED).getValue());
- compactState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME,
this.state.getProp(CompactionSource.COMPACTION_INIT_TIME));
- helper.saveState(new Path(result.getDstAbsoluteDir()), compactState);
- log.info("duplicated records count for "+ dstPath + " : " +
compactState.getProp("DuplicateRecordCount"));
-
+ compactionState.setProp("DuplicateRecordCount",
+
job.getCounters().findCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.DEDUPED).getValue());
+ compactionState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME,
+ this.state.getProp(CompactionSource.COMPACTION_INIT_TIME));
+ helper.saveState(new Path(result.getDstAbsoluteDir()), compactionState);
+ log.info("duplicated records count for " + dstPath + " : " +
compactionState.getProp("DuplicateRecordCount"));
log.info("Updating record count from {} to {} in {} [{}]",
oldTotalRecords, newTotalRecords, dstPath,
- executeCount + 1);
+ executionCount + 1);
// submit events for record count
if (eventSubmitter != null) {
@@ -176,7 +185,7 @@ public class CompactionCompleteFileOperationAction
implements CompactionComplete
ImmutableMap.of(CompactionSlaEventHelper.DATASET_URN,
dataset.datasetURN(),
CompactionSlaEventHelper.RECORD_COUNT_TOTAL,
Long.toString(newTotalRecords),
CompactionSlaEventHelper.PREV_RECORD_COUNT_TOTAL,
Long.toString(oldTotalRecords),
- CompactionSlaEventHelper.EXEC_COUNT_TOTAL,
Long.toString(executeCount + 1),
+ CompactionSlaEventHelper.EXEC_COUNT_TOTAL,
Long.toString(executionCount + 1),
CompactionSlaEventHelper.MR_JOB_ID,
this.configurator.getConfiguredJob().getJobID().toString());
this.eventSubmitter.submit(CompactionSlaEventHelper.COMPACTION_RECORD_COUNT_EVENT,
eventMetadataMap);
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
index ca32ea2..0c63107 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
+import org.apache.gobblin.util.PathUtils;
import org.apache.hadoop.fs.Path;
import com.google.common.base.Joiner;
@@ -57,6 +59,7 @@ public class CompactionHiveRegistrationAction implements
CompactionCompleteActio
private final State state;
private EventSubmitter eventSubmitter;
+ private InputRecordCountHelper helper;
public CompactionHiveRegistrationAction (State state) {
if (!(state instanceof WorkUnitState)) {
@@ -88,10 +91,18 @@ public class CompactionHiveRegistrationAction implements
CompactionCompleteActio
HiveRegistrationPolicy hiveRegistrationPolicy =
HiveRegistrationPolicyBase.getPolicy(state);
List<String> paths = new ArrayList<>();
- for (HiveSpec spec : hiveRegistrationPolicy.getHiveSpecs(new
Path(result.getDstAbsoluteDir()))) {
+ Path dstPath = new Path(result.getDstAbsoluteDir());
+ if
(state.getPropAsBoolean(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER,
false)) {
+ //Lazily initialize helper
+ this.helper = new InputRecordCountHelper(state);
+ long executionCount = helper.readExecutionCount(new
Path(result.getDstAbsoluteDir()));
+ // Use new output path to do registration
+ dstPath = PathUtils.mergePaths(dstPath, new
Path(String.format(CompactionCompleteFileOperationAction.COMPACTION_DIRECTORY_FORMAT,
executionCount)));
+ }
+ for (HiveSpec spec : hiveRegistrationPolicy.getHiveSpecs(dstPath)) {
hiveRegister.register(spec);
paths.add(spec.getPath().toUri().toASCIIString());
- log.info("Hive registration is done for {}",
result.getDstAbsoluteDir());
+ log.info("Hive registration is done for {}", dstPath.toString());
}
// submit events for hive registration
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
index 7f9ef51..145ecf1 100644
---
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
@@ -173,9 +173,43 @@ public class AvroCompactionTaskTest {
Assert.assertTrue(fs.exists(new Path (basePath,
"Identity/MemberAccount/hourly/2017/04/03/10")));
}
+ @Test
+ public void testAvroRecompactionWriteToNewPath() throws Exception {
+ FileSystem fs = getFileSystem();
+ String basePath = "/tmp/testRecompactionWriteToNewPath";
+ fs.delete(new Path(basePath), true);
+
+ File jobDir = new File(basePath,
"Identity/MemberAccount/minutely/2017/04/03/10/20_30/run_2017-04-03-10-20");
+ Assert.assertTrue(jobDir.mkdirs());
+
+ GenericRecord r1 = createRandomRecord();
+ writeFileWithContent(jobDir, "file1", r1, 20);
+
+ EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin
("Recompaction-First", basePath);
+
embeddedGobblin.setConfiguration(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER,
"true");
+ JobExecutionResult result = embeddedGobblin.run();
+ long recordCount = InputRecordCountHelper.readRecordCount(fs, (new Path
(basePath, new Path("Identity/MemberAccount/hourly/2017/04/03/10"))));
+ Assert.assertTrue(result.isSuccessful());
+ Assert.assertEquals(recordCount, 20);
+
+ // Now write more avro files to input dir
+ writeFileWithContent(jobDir, "file2", r1, 22);
+ EmbeddedGobblin embeddedGobblin_2 = createEmbeddedGobblin
("Recompaction-Second", basePath);
+
embeddedGobblin_2.setConfiguration(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER,
"true");
+ embeddedGobblin_2.run();
+ Assert.assertTrue(result.isSuccessful());
+
+ // If recompaction is succeeded, a new record count should be written.
+ recordCount = InputRecordCountHelper.readRecordCount(fs, (new Path
(basePath, new Path("Identity/MemberAccount/hourly/2017/04/03/10"))));
+ Assert.assertEquals(recordCount, 42);
+ //Assert both old output and new output exist
+ Assert.assertTrue(fs.exists(new Path (basePath,
"Identity/MemberAccount/hourly/2017/04/03/10/compaction_1")));
+ Assert.assertTrue(fs.exists(new Path (basePath,
"Identity/MemberAccount/hourly/2017/04/03/10/compaction_2")));
+ }
+
public void testAvroRecompactionWithLimitation() throws Exception {
FileSystem fs = getFileSystem();
- String basePath = "/tmp/testRecompaction";
+ String basePath = "/tmp/testRecompactionWithLimitation";
fs.delete(new Path(basePath), true);
File jobDir = new File(basePath,
"Identity/MemberAccount/minutely/2017/04/03/10/20_30/run_2017-04-03-10-20");