This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new af33445a2 [GOBBLIN-1633] Fix compaction actions on job failure not
retried if compaction succeeds (#3494)
af33445a2 is described below
commit af33445a2480d691a9700511fdd55567e54aa2e4
Author: Matthew Ho <[email protected]>
AuthorDate: Fri May 13 09:32:54 2022 -0700
[GOBBLIN-1633] Fix compaction actions on job failure not retried if
compaction succeeds (#3494)
* GOBBLIN-1633
Fix compaction on job failure not retried if compaction succeeds
* Fix typos
---
.../CompactionCompleteFileOperationAction.java | 3 --
.../compaction/mapreduce/MRCompactionTask.java | 28 ++++++++++++++-
.../verify/CompactionTimeRangeVerifier.java | 13 +++----
.../verify/CompactionTimeVerifierTest.java | 40 +++++++++++-----------
.../gobblin/runtime/AbstractJobLauncher.java | 2 +-
5 files changed, 55 insertions(+), 31 deletions(-)
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index f0f900d0c..d1fd3f796 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -32,7 +32,6 @@ import
org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner;
import org.apache.gobblin.compaction.mapreduce.RecordKeyDedupReducerBase;
import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
-import org.apache.gobblin.compaction.source.CompactionSource;
import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
@@ -183,8 +182,6 @@ public class CompactionCompleteFileOperationAction
implements CompactionComplete
this.configurator.getConfiguredJob().getJobID().toString());
compactionState.setProp(DUPLICATE_COUNT_TOTAL,
job.getCounters().findCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.DEDUPED).getValue());
- compactionState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME,
- this.state.getProp(CompactionSource.COMPACTION_INIT_TIME));
helper.saveState(new Path(result.getDstAbsoluteDir()), compactionState);
log.info("duplicated records count for " + dstPath + " : " +
compactionState.getProp(DUPLICATE_COUNT_TOTAL));
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
index 952e46aed..ce885f0cf 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
@@ -30,15 +30,20 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.compaction.action.CompactionCompleteAction;
import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
+import org.apache.gobblin.compaction.parser.CompactionPathParser;
+import org.apache.gobblin.compaction.source.CompactionSource;
import org.apache.gobblin.compaction.suite.CompactionSuite;
import org.apache.gobblin.compaction.suite.CompactionSuiteUtils;
import org.apache.gobblin.compaction.verify.CompactionVerifier;
+import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
+import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.Dataset;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.mapreduce.MRTask;
+import org.apache.hadoop.fs.Path;
@@ -101,7 +106,8 @@ public class MRCompactionTask extends MRTask {
public void onMRTaskComplete (boolean isSuccess, Throwable throwable) {
if (isSuccess) {
try {
- setCounterInfo(taskContext.getTaskState());
+ TaskState taskState = taskContext.getTaskState();
+ setCounterInfo(taskState);
List<CompactionCompleteAction> actions =
this.suite.getCompactionCompleteActions();
for (CompactionCompleteAction action: actions) {
@@ -109,6 +115,10 @@ public class MRCompactionTask extends MRTask {
action.onCompactionJobComplete(dataset);
}
submitEvent(CompactionSlaEventHelper.COMPACTION_COMPLETED_EVENT_NAME);
+ if (dataset instanceof FileSystemDataset) {
+ commitRunStartTimeInfo(taskState, (FileSystemDataset) dataset);
+ }
+
super.onMRTaskComplete(true, null);
} catch (IOException e) {
submitEvent(CompactionSlaEventHelper.COMPACTION_FAILED_EVENT_NAME);
@@ -120,6 +130,22 @@ public class MRCompactionTask extends MRTask {
}
}
+ /**
+ * Persist the run start time which is used to determine when the last
successful compaction run started. This
+ * value is useful for limiting how often you recompact by verifying whether
a dataset has recently been compacted.
+ * @param taskState
+ * @param dataset
+ * @throws IOException
+ */
+ private static void commitRunStartTimeInfo(TaskState taskState,
FileSystemDataset dataset) throws IOException {
+ CompactionPathParser.CompactionParserResult result = new
CompactionPathParser(taskState).parse(dataset);
+ InputRecordCountHelper helper = new InputRecordCountHelper(taskState);
+ State compactionState = helper.loadState(new
Path(result.getDstAbsoluteDir()));
+ compactionState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME,
+ taskState.getProp(CompactionSource.COMPACTION_INIT_TIME));
+ helper.saveState(new Path(result.getDstAbsoluteDir()), compactionState);
+ }
+
private void setCounterInfo(TaskState taskState)
throws IOException {
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
index 0d580d0da..634eadf7f 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
@@ -39,7 +39,7 @@ import org.joda.time.format.PeriodFormatterBuilder;
/**
- * A simple class which verify current dataset belongs to a specific time
range. Will skip to do
+ * A simple class which verify current dataset belongs to a specific time
range. Will skip doing
* compaction if dataset is not in a correct time range.
*/
@@ -74,7 +74,7 @@ public class CompactionTimeRangeVerifier implements
CompactionVerifier<FileSyste
// get earliest time
String maxTimeAgoStrList =
this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MAX_TIME_AGO,
TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO);
- String maxTimeAgoStr = getMachedLookbackTime(datasetName,
maxTimeAgoStrList,
+ String maxTimeAgoStr = getMatchedLookbackTime(datasetName,
maxTimeAgoStrList,
TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO);
Period maxTimeAgo = formatter.parsePeriod(maxTimeAgoStr);
earliest = compactionStartTime.minus(maxTimeAgo);
@@ -82,7 +82,7 @@ public class CompactionTimeRangeVerifier implements
CompactionVerifier<FileSyste
// get latest time
String minTimeAgoStrList =
this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MIN_TIME_AGO,
TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO);
- String minTimeAgoStr = getMachedLookbackTime(datasetName,
minTimeAgoStrList,
+ String minTimeAgoStr = getMatchedLookbackTime(datasetName,
minTimeAgoStrList,
TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO);
Period minTimeAgo = formatter.parsePeriod(minTimeAgoStr);
latest = compactionStartTime.minus(minTimeAgo);
@@ -90,7 +90,7 @@ public class CompactionTimeRangeVerifier implements
CompactionVerifier<FileSyste
// get latest last run start time, we want to limit the duration between
two compaction for the same dataset
if
(state.contains(TimeBasedSubDirDatasetsFinder.MIN_RECOMPACTION_DURATION)) {
String minDurationStrList =
this.state.getProp(TimeBasedSubDirDatasetsFinder.MIN_RECOMPACTION_DURATION);
- String minDurationStr = getMachedLookbackTime(datasetName,
minDurationStrList,
+ String minDurationStr = getMatchedLookbackTime(datasetName,
minDurationStrList,
TimeBasedSubDirDatasetsFinder.DEFAULT_MIN_RECOMPACTION_DURATION);
Period minDurationTime = formatter.parsePeriod(minDurationStr);
DateTime latestEligibleCompactTime =
compactionStartTime.minus(minDurationTime);
@@ -99,7 +99,8 @@ public class CompactionTimeRangeVerifier implements
CompactionVerifier<FileSyste
if (compactState.contains(CompactionSlaEventHelper.LAST_RUN_START_TIME)
&&
compactState.getPropAsLong(CompactionSlaEventHelper.LAST_RUN_START_TIME)
> latestEligibleCompactTime.getMillis()) {
- log.warn("Last compaction for {} is {}, not before {}",
dataset.datasetRoot(),
+ log.warn("Last compaction for {} is {}, which is not before
latestEligibleCompactTime={}",
+ dataset.datasetRoot(),
new
DateTime(compactState.getPropAsLong(CompactionSlaEventHelper.LAST_RUN_START_TIME),
timeZone),
latestEligibleCompactTime);
return new Result(false,
@@ -143,7 +144,7 @@ public class CompactionTimeRangeVerifier implements
CompactionVerifier<FileSyste
* @param datasetName A description of dataset without time partition
information. Example 'Identity/MemberAccount' or 'PageViewEvent'
* @return The lookback time matched with given dataset.
*/
- public static String getMachedLookbackTime(String datasetName, String
datasetsAndLookBacks,
+ public static String getMatchedLookbackTime(String datasetName, String
datasetsAndLookBacks,
String sysDefaultLookback) {
String defaultLookback = sysDefaultLookback;
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionTimeVerifierTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionTimeVerifierTest.java
index 654e1957d..9485658dd 100644
---
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionTimeVerifierTest.java
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionTimeVerifierTest.java
@@ -27,15 +27,15 @@ public class CompactionTimeVerifierTest {
public void testOneDatasetTime() {
String timeString = "Identity.MemberAccount:1d2h";
- String lb1 =
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount",
timeString, "2d");
+ String lb1 =
CompactionTimeRangeVerifier.getMatchedLookbackTime("Identity/MemberAccount",
timeString, "2d");
Assert.assertEquals(lb1, "1d2h");
- String lb2 =
CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany",
timeString, "2d");
+ String lb2 =
CompactionTimeRangeVerifier.getMatchedLookbackTime("BizProfile/BizCompany",
timeString, "2d");
Assert.assertEquals(lb2, "2d");
timeString = "2d;Identity.MemberAccount:1d2h";
- String lb3 =
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount",
timeString, "2d");
+ String lb3 =
CompactionTimeRangeVerifier.getMatchedLookbackTime("Identity/MemberAccount",
timeString, "2d");
Assert.assertEquals(lb3, "1d2h");
- String lb4 =
CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany",
timeString, "2d");
+ String lb4 =
CompactionTimeRangeVerifier.getMatchedLookbackTime("BizProfile/BizCompany",
timeString, "2d");
Assert.assertEquals(lb4, "2d");
}
@@ -43,30 +43,30 @@ public class CompactionTimeVerifierTest {
public void testTwoDatasetTime() {
String timeString = "Identity.*:1d2h;BizProfile.BizCompany:3d";
- String lb1 =
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount",
timeString, "2d");
+ String lb1 =
CompactionTimeRangeVerifier.getMatchedLookbackTime("Identity/MemberAccount",
timeString, "2d");
Assert.assertEquals(lb1, "1d2h");
- String lb2 =
CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany",
timeString, "2d");
+ String lb2 =
CompactionTimeRangeVerifier.getMatchedLookbackTime("BizProfile/BizCompany",
timeString, "2d");
Assert.assertEquals(lb2, "3d");
- String lb3 =
CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString,
"2d");
+ String lb3 =
CompactionTimeRangeVerifier.getMatchedLookbackTime("ABC/Unknown", timeString,
"2d");
Assert.assertEquals(lb3, "2d");
timeString = "2d;Identity.MemberAccount:1d2h;BizProfile.BizCompany:3d";
- String lb4 =
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount",
timeString, "2d");
+ String lb4 =
CompactionTimeRangeVerifier.getMatchedLookbackTime("Identity/MemberAccount",
timeString, "2d");
Assert.assertEquals(lb4, "1d2h");
- String lb5 =
CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany",
timeString, "2d");
+ String lb5 =
CompactionTimeRangeVerifier.getMatchedLookbackTime("BizProfile/BizCompany",
timeString, "2d");
Assert.assertEquals(lb5, "3d");
- String lb6 =
CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString,
"2d");
+ String lb6 =
CompactionTimeRangeVerifier.getMatchedLookbackTime("ABC/Unknown", timeString,
"2d");
Assert.assertEquals(lb6, "2d");
}
@Test
public void testDefaultDatasetTime() {
String timeString = "Identity.*:1d2h;3d2h;BizProfile.BizCompany:3d";
- String lb1 =
CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString,
"2d");
+ String lb1 =
CompactionTimeRangeVerifier.getMatchedLookbackTime("ABC/Unknown", timeString,
"2d");
Assert.assertEquals(lb1, "3d2h");
timeString = "3d2h";
- String lb2 =
CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString,
"2d");
+ String lb2 =
CompactionTimeRangeVerifier.getMatchedLookbackTime("ABC/Unknown", timeString,
"2d");
Assert.assertEquals(lb2, "3d2h");
}
@@ -74,28 +74,28 @@ public class CompactionTimeVerifierTest {
public void testEmptySpace() {
String timeString = "Identity.* : 1d2h ; BizProfile.BizCompany : 3d";
- String lb1 =
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount",
timeString, "2d");
+ String lb1 =
CompactionTimeRangeVerifier.getMatchedLookbackTime("Identity/MemberAccount",
timeString, "2d");
Assert.assertEquals(lb1, "1d2h");
- String lb2 =
CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany",
timeString, "2d");
+ String lb2 =
CompactionTimeRangeVerifier.getMatchedLookbackTime("BizProfile/BizCompany",
timeString, "2d");
Assert.assertEquals(lb2, "3d");
- String lb3 =
CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString,
"2d");
+ String lb3 =
CompactionTimeRangeVerifier.getMatchedLookbackTime("ABC/Unknown", timeString,
"2d");
Assert.assertEquals(lb3, "2d");
timeString = "2d;Identity.MemberAccount :1d2h;
BizProfile.BizCompany:3d";
- String lb4 =
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount",
timeString, "2d");
+ String lb4 =
CompactionTimeRangeVerifier.getMatchedLookbackTime("Identity/MemberAccount",
timeString, "2d");
Assert.assertEquals(lb4, "1d2h");
- String lb5 =
CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany",
timeString, "2d");
+ String lb5 =
CompactionTimeRangeVerifier.getMatchedLookbackTime("BizProfile/BizCompany",
timeString, "2d");
Assert.assertEquals(lb5, "3d");
- String lb6 =
CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString,
"2d");
+ String lb6 =
CompactionTimeRangeVerifier.getMatchedLookbackTime("ABC/Unknown", timeString,
"2d");
Assert.assertEquals(lb6, "2d");
}
@Test
public void testPartialMatchedNames() {
String timeString = "Identity.Member$ : 1d2h";
- String lb1 =
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/Member",
timeString, "2d");
+ String lb1 =
CompactionTimeRangeVerifier.getMatchedLookbackTime("Identity/Member",
timeString, "2d");
Assert.assertEquals(lb1, "1d2h");
- String lb2 =
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount",
timeString, "2d");
+ String lb2 =
CompactionTimeRangeVerifier.getMatchedLookbackTime("Identity/MemberAccount",
timeString, "2d");
Assert.assertEquals(lb2, "2d");
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 42ce5c5d7..b7a79bc36 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -608,7 +608,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
}
} catch (Throwable t) {
jobState.setState(JobState.RunningState.FAILED);
- String errMsg = "Failed to launch and run job " + jobId + " due to" +
t.getMessage();
+ String errMsg = "Failed to launch and run job " + jobId + " due to " +
t.getMessage();
LOG.error(errMsg + ": " + t, t);
this.jobContext.getJobState().setJobFailureException(t);
} finally {