This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new af33445a2 [GOBBLIN-1633] Fix compaction actions on job failure not 
retried if compaction succeeds (#3494)
af33445a2 is described below

commit af33445a2480d691a9700511fdd55567e54aa2e4
Author: Matthew Ho <[email protected]>
AuthorDate: Fri May 13 09:32:54 2022 -0700

    [GOBBLIN-1633] Fix compaction actions on job failure not retried if 
compaction succeeds (#3494)
    
    * GOBBLIN-1633
    
    Fix compaction on job failure not retried if compaction succeeds
    
    * Fix typos
---
 .../CompactionCompleteFileOperationAction.java     |  3 --
 .../compaction/mapreduce/MRCompactionTask.java     | 28 ++++++++++++++-
 .../verify/CompactionTimeRangeVerifier.java        | 13 +++----
 .../verify/CompactionTimeVerifierTest.java         | 40 +++++++++++-----------
 .../gobblin/runtime/AbstractJobLauncher.java       |  2 +-
 5 files changed, 55 insertions(+), 31 deletions(-)

diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index f0f900d0c..d1fd3f796 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -32,7 +32,6 @@ import 
org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner;
 import org.apache.gobblin.compaction.mapreduce.RecordKeyDedupReducerBase;
 import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
-import org.apache.gobblin.compaction.source.CompactionSource;
 import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
@@ -183,8 +182,6 @@ public class CompactionCompleteFileOperationAction 
implements CompactionComplete
           this.configurator.getConfiguredJob().getJobID().toString());
       compactionState.setProp(DUPLICATE_COUNT_TOTAL,
           
job.getCounters().findCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.DEDUPED).getValue());
-      compactionState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME,
-          this.state.getProp(CompactionSource.COMPACTION_INIT_TIME));
       helper.saveState(new Path(result.getDstAbsoluteDir()), compactionState);
       log.info("duplicated records count for " + dstPath + " : " + 
compactionState.getProp(DUPLICATE_COUNT_TOTAL));
 
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
index 952e46aed..ce885f0cf 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
@@ -30,15 +30,20 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.compaction.action.CompactionCompleteAction;
 import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
+import org.apache.gobblin.compaction.parser.CompactionPathParser;
+import org.apache.gobblin.compaction.source.CompactionSource;
 import org.apache.gobblin.compaction.suite.CompactionSuite;
 import org.apache.gobblin.compaction.suite.CompactionSuiteUtils;
 import org.apache.gobblin.compaction.verify.CompactionVerifier;
+import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
+import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.Dataset;
 import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.runtime.TaskContext;
 import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.runtime.mapreduce.MRTask;
+import org.apache.hadoop.fs.Path;
 
 
 
@@ -101,7 +106,8 @@ public class MRCompactionTask extends MRTask {
   public void onMRTaskComplete (boolean isSuccess, Throwable throwable) {
     if (isSuccess) {
       try {
-        setCounterInfo(taskContext.getTaskState());
+        TaskState taskState = taskContext.getTaskState();
+        setCounterInfo(taskState);
 
         List<CompactionCompleteAction> actions = 
this.suite.getCompactionCompleteActions();
         for (CompactionCompleteAction action: actions) {
@@ -109,6 +115,10 @@ public class MRCompactionTask extends MRTask {
           action.onCompactionJobComplete(dataset);
         }
         submitEvent(CompactionSlaEventHelper.COMPACTION_COMPLETED_EVENT_NAME);
+        if (dataset instanceof FileSystemDataset) {
+          commitRunStartTimeInfo(taskState, (FileSystemDataset) dataset);
+        }
+
         super.onMRTaskComplete(true, null);
       } catch (IOException e) {
         submitEvent(CompactionSlaEventHelper.COMPACTION_FAILED_EVENT_NAME);
@@ -120,6 +130,22 @@ public class MRCompactionTask extends MRTask {
     }
   }
 
+  /**
+   * Persist the run start time which is used to determine when the last 
successful compaction run started. This
+   * value is useful for limiting how often you recompact by verifying whether 
a dataset has recently been compacted.
+   * @param taskState
+   * @param dataset
+   * @throws IOException
+   */
+  private static void commitRunStartTimeInfo(TaskState taskState, 
FileSystemDataset dataset) throws IOException {
+    CompactionPathParser.CompactionParserResult result = new 
CompactionPathParser(taskState).parse(dataset);
+    InputRecordCountHelper helper = new InputRecordCountHelper(taskState);
+    State compactionState = helper.loadState(new 
Path(result.getDstAbsoluteDir()));
+    compactionState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME,
+        taskState.getProp(CompactionSource.COMPACTION_INIT_TIME));
+    helper.saveState(new Path(result.getDstAbsoluteDir()), compactionState);
+  }
+
   private void setCounterInfo(TaskState taskState)
       throws IOException {
 
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
index 0d580d0da..634eadf7f 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
@@ -39,7 +39,7 @@ import org.joda.time.format.PeriodFormatterBuilder;
 
 
 /**
- * A simple class which verify current dataset belongs to a specific time 
range. Will skip to do
+ * A simple class which verify current dataset belongs to a specific time 
range. Will skip doing
  * compaction if dataset is not in a correct time range.
  */
 
@@ -74,7 +74,7 @@ public class CompactionTimeRangeVerifier implements 
CompactionVerifier<FileSyste
       // get earliest time
       String maxTimeAgoStrList = 
this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MAX_TIME_AGO,
           
TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO);
-      String maxTimeAgoStr = getMachedLookbackTime(datasetName, 
maxTimeAgoStrList,
+      String maxTimeAgoStr = getMatchedLookbackTime(datasetName, 
maxTimeAgoStrList,
           
TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO);
       Period maxTimeAgo = formatter.parsePeriod(maxTimeAgoStr);
       earliest = compactionStartTime.minus(maxTimeAgo);
@@ -82,7 +82,7 @@ public class CompactionTimeRangeVerifier implements 
CompactionVerifier<FileSyste
       // get latest time
       String minTimeAgoStrList = 
this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MIN_TIME_AGO,
           
TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO);
-      String minTimeAgoStr = getMachedLookbackTime(datasetName, 
minTimeAgoStrList,
+      String minTimeAgoStr = getMatchedLookbackTime(datasetName, 
minTimeAgoStrList,
           
TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO);
       Period minTimeAgo = formatter.parsePeriod(minTimeAgoStr);
       latest = compactionStartTime.minus(minTimeAgo);
@@ -90,7 +90,7 @@ public class CompactionTimeRangeVerifier implements 
CompactionVerifier<FileSyste
       // get latest last run start time, we want to limit the duration between 
two compaction for the same dataset
       if 
(state.contains(TimeBasedSubDirDatasetsFinder.MIN_RECOMPACTION_DURATION)) {
         String minDurationStrList = 
this.state.getProp(TimeBasedSubDirDatasetsFinder.MIN_RECOMPACTION_DURATION);
-        String minDurationStr = getMachedLookbackTime(datasetName, 
minDurationStrList,
+        String minDurationStr = getMatchedLookbackTime(datasetName, 
minDurationStrList,
             TimeBasedSubDirDatasetsFinder.DEFAULT_MIN_RECOMPACTION_DURATION);
         Period minDurationTime = formatter.parsePeriod(minDurationStr);
         DateTime latestEligibleCompactTime = 
compactionStartTime.minus(minDurationTime);
@@ -99,7 +99,8 @@ public class CompactionTimeRangeVerifier implements 
CompactionVerifier<FileSyste
         if (compactState.contains(CompactionSlaEventHelper.LAST_RUN_START_TIME)
             && 
compactState.getPropAsLong(CompactionSlaEventHelper.LAST_RUN_START_TIME)
             > latestEligibleCompactTime.getMillis()) {
-          log.warn("Last compaction for {} is {}, not before {}", 
dataset.datasetRoot(),
+          log.warn("Last compaction for {} is {}, which is not before 
latestEligibleCompactTime={}",
+              dataset.datasetRoot(),
               new 
DateTime(compactState.getPropAsLong(CompactionSlaEventHelper.LAST_RUN_START_TIME),
 timeZone),
               latestEligibleCompactTime);
           return new Result(false,
@@ -143,7 +144,7 @@ public class CompactionTimeRangeVerifier implements 
CompactionVerifier<FileSyste
    * @param datasetName A description of dataset without time partition 
information. Example 'Identity/MemberAccount' or 'PageViewEvent'
    * @return The lookback time matched with given dataset.
    */
-  public static String getMachedLookbackTime(String datasetName, String 
datasetsAndLookBacks,
+  public static String getMatchedLookbackTime(String datasetName, String 
datasetsAndLookBacks,
       String sysDefaultLookback) {
     String defaultLookback = sysDefaultLookback;
 
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionTimeVerifierTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionTimeVerifierTest.java
index 654e1957d..9485658dd 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionTimeVerifierTest.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionTimeVerifierTest.java
@@ -27,15 +27,15 @@ public class CompactionTimeVerifierTest {
   public void testOneDatasetTime() {
     String timeString = "Identity.MemberAccount:1d2h";
 
-    String lb1 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount", 
timeString, "2d");
+    String lb1 = 
CompactionTimeRangeVerifier.getMatchedLookbackTime("Identity/MemberAccount", 
timeString, "2d");
     Assert.assertEquals(lb1, "1d2h");
-    String lb2 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany", 
timeString, "2d");
+    String lb2 = 
CompactionTimeRangeVerifier.getMatchedLookbackTime("BizProfile/BizCompany", 
timeString, "2d");
     Assert.assertEquals(lb2, "2d");
 
     timeString = "2d;Identity.MemberAccount:1d2h";
-    String lb3 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount", 
timeString, "2d");
+    String lb3 = 
CompactionTimeRangeVerifier.getMatchedLookbackTime("Identity/MemberAccount", 
timeString, "2d");
     Assert.assertEquals(lb3, "1d2h");
-    String lb4 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany", 
timeString, "2d");
+    String lb4 = 
CompactionTimeRangeVerifier.getMatchedLookbackTime("BizProfile/BizCompany", 
timeString, "2d");
     Assert.assertEquals(lb4, "2d");
   }
 
@@ -43,30 +43,30 @@ public class CompactionTimeVerifierTest {
   public void testTwoDatasetTime() {
     String timeString = "Identity.*:1d2h;BizProfile.BizCompany:3d";
 
-    String lb1 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount", 
timeString, "2d");
+    String lb1 = 
CompactionTimeRangeVerifier.getMatchedLookbackTime("Identity/MemberAccount", 
timeString, "2d");
     Assert.assertEquals(lb1, "1d2h");
-    String lb2 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany", 
timeString, "2d");
+    String lb2 = 
CompactionTimeRangeVerifier.getMatchedLookbackTime("BizProfile/BizCompany", 
timeString, "2d");
     Assert.assertEquals(lb2, "3d");
-    String lb3 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString, 
"2d");
+    String lb3 = 
CompactionTimeRangeVerifier.getMatchedLookbackTime("ABC/Unknown", timeString, 
"2d");
     Assert.assertEquals(lb3, "2d");
 
     timeString = "2d;Identity.MemberAccount:1d2h;BizProfile.BizCompany:3d";
-    String lb4 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount", 
timeString, "2d");
+    String lb4 = 
CompactionTimeRangeVerifier.getMatchedLookbackTime("Identity/MemberAccount", 
timeString, "2d");
     Assert.assertEquals(lb4, "1d2h");
-    String lb5 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany", 
timeString, "2d");
+    String lb5 = 
CompactionTimeRangeVerifier.getMatchedLookbackTime("BizProfile/BizCompany", 
timeString, "2d");
     Assert.assertEquals(lb5, "3d");
-    String lb6 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString, 
"2d");
+    String lb6 = 
CompactionTimeRangeVerifier.getMatchedLookbackTime("ABC/Unknown", timeString, 
"2d");
     Assert.assertEquals(lb6, "2d");
   }
 
   @Test
   public void testDefaultDatasetTime() {
     String timeString = "Identity.*:1d2h;3d2h;BizProfile.BizCompany:3d";
-    String lb1 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString, 
"2d");
+    String lb1 = 
CompactionTimeRangeVerifier.getMatchedLookbackTime("ABC/Unknown", timeString, 
"2d");
     Assert.assertEquals(lb1, "3d2h");
 
     timeString = "3d2h";
-    String lb2 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString, 
"2d");
+    String lb2 = 
CompactionTimeRangeVerifier.getMatchedLookbackTime("ABC/Unknown", timeString, 
"2d");
     Assert.assertEquals(lb2, "3d2h");
   }
 
@@ -74,28 +74,28 @@ public class CompactionTimeVerifierTest {
   public void testEmptySpace() {
     String timeString = "Identity.* :   1d2h ; BizProfile.BizCompany : 3d";
 
-    String lb1 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount", 
timeString, "2d");
+    String lb1 = 
CompactionTimeRangeVerifier.getMatchedLookbackTime("Identity/MemberAccount", 
timeString, "2d");
     Assert.assertEquals(lb1, "1d2h");
-    String lb2 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany", 
timeString, "2d");
+    String lb2 = 
CompactionTimeRangeVerifier.getMatchedLookbackTime("BizProfile/BizCompany", 
timeString, "2d");
     Assert.assertEquals(lb2, "3d");
-    String lb3 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString, 
"2d");
+    String lb3 = 
CompactionTimeRangeVerifier.getMatchedLookbackTime("ABC/Unknown", timeString, 
"2d");
     Assert.assertEquals(lb3, "2d");
 
     timeString = "2d;Identity.MemberAccount  :1d2h;   
BizProfile.BizCompany:3d";
-    String lb4 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount", 
timeString, "2d");
+    String lb4 = 
CompactionTimeRangeVerifier.getMatchedLookbackTime("Identity/MemberAccount", 
timeString, "2d");
     Assert.assertEquals(lb4, "1d2h");
-    String lb5 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany", 
timeString, "2d");
+    String lb5 = 
CompactionTimeRangeVerifier.getMatchedLookbackTime("BizProfile/BizCompany", 
timeString, "2d");
     Assert.assertEquals(lb5, "3d");
-    String lb6 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString, 
"2d");
+    String lb6 = 
CompactionTimeRangeVerifier.getMatchedLookbackTime("ABC/Unknown", timeString, 
"2d");
     Assert.assertEquals(lb6, "2d");
   }
 
   @Test
   public void testPartialMatchedNames() {
     String timeString = "Identity.Member$ :   1d2h";
-    String lb1 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/Member", 
timeString, "2d");
+    String lb1 = 
CompactionTimeRangeVerifier.getMatchedLookbackTime("Identity/Member", 
timeString, "2d");
     Assert.assertEquals(lb1, "1d2h");
-    String lb2 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount", 
timeString, "2d");
+    String lb2 = 
CompactionTimeRangeVerifier.getMatchedLookbackTime("Identity/MemberAccount", 
timeString, "2d");
     Assert.assertEquals(lb2, "2d");
   }
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 42ce5c5d7..b7a79bc36 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -608,7 +608,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
         }
       } catch (Throwable t) {
         jobState.setState(JobState.RunningState.FAILED);
-        String errMsg = "Failed to launch and run job " + jobId + " due to" + 
t.getMessage();
+        String errMsg = "Failed to launch and run job " + jobId + " due to " + 
t.getMessage();
         LOG.error(errMsg + ": " + t, t);
         this.jobContext.getJobState().setJobFailureException(t);
       } finally {

Reply via email to