This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a77250b  [GOBBLIN-1419] Error handling for compaction pipeline on GMCE 
emitted error
a77250b is described below

commit a77250b057e1ba83531ee0bed6e8302ed6c18510
Author: Zihan Li <[email protected]>
AuthorDate: Fri Apr 23 11:13:46 2021 -0700

    [GOBBLIN-1419] Error handling for compaction pipeline on GMCE emitted error
    
    [GOBBLIN-1419]Error handling for compaction
    pipeline on GMCE emitted error
    
    address comments
    
    Closes #3255 from ZihanLi58/GOBBLIN-1419
---
 .../gobblin/configuration/ConfigurationKeys.java   |  1 +
 .../CompactionCompleteFileOperationAction.java     | 20 ++++++++++++-----
 .../action/CompactionGMCEPublishingAction.java     | 21 +++++++++++++-----
 ...ionSuiteBaseWithConfigurableCompleteAction.java |  3 ++-
 .../verify/CompactionThresholdVerifier.java        | 25 ++++++++++++----------
 5 files changed, 48 insertions(+), 22 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 228d835..ed4ff00 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -1058,6 +1058,7 @@ public class ConfigurationKeys {
   /**
    * Configuration and constant vale for GobblinMetadataChangeEvent
    */
+  public static final String GOBBLIN_METADATA_CHANGE_EVENT_ENABLED = 
"GobblinMetadataChangeEvent.enabled";
   public static final String LIST_DELIMITER_KEY = ",";
   public static final String RANGE_DELIMITER_KEY = "-";
 
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index b7522ac..f0f900d 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Job;
 
-import static 
org.apache.gobblin.compaction.event.CompactionSlaEventHelper.DUPLICATE_COUNT_TOTAL;
+import static org.apache.gobblin.compaction.event.CompactionSlaEventHelper.*;
 
 
 /**
@@ -66,16 +66,20 @@ public class CompactionCompleteFileOperationAction 
implements CompactionComplete
   private EventSubmitter eventSubmitter;
   private FileSystem fs;
 
-  public CompactionCompleteFileOperationAction(State state, 
CompactionJobConfigurator configurator) {
+  public CompactionCompleteFileOperationAction(State state, 
CompactionJobConfigurator configurator, InputRecordCountHelper helper) {
     if (!(state instanceof WorkUnitState)) {
       throw new UnsupportedOperationException(this.getClass().getName() + " 
only supports workunit state");
     }
     this.state = (WorkUnitState) state;
-    this.helper = new InputRecordCountHelper(state);
+    this.helper = helper;
     this.configurator = configurator;
     this.fs = configurator.getFs();
   }
 
+  public CompactionCompleteFileOperationAction(State state, 
CompactionJobConfigurator configurator) {
+    this(state, configurator, new InputRecordCountHelper(state));
+  }
+
   /**
    * Replace or append the destination folder with new files from map-reduce 
job
    * Create a record count file containing the number of records that have 
been processed .
@@ -127,8 +131,10 @@ public class CompactionCompleteFileOperationAction 
implements CompactionComplete
         newTotalRecords = this.configurator.getFileNameRecordCount();
       } else {
         if 
(state.getPropAsBoolean(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER, 
false)) {
-          Path oldFilePath = PathUtils.mergePaths(dstPath, new 
Path(String.format(COMPACTION_DIRECTORY_FORMAT, executionCount)));
-          dstPath = PathUtils.mergePaths(dstPath, new 
Path(String.format(COMPACTION_DIRECTORY_FORMAT, executionCount + 1)));
+          Path oldFilePath =
+              PathUtils.mergePaths(dstPath, new 
Path(String.format(COMPACTION_DIRECTORY_FORMAT, executionCount)));
+          dstPath =
+              PathUtils.mergePaths(dstPath, new 
Path(String.format(COMPACTION_DIRECTORY_FORMAT, executionCount + 1)));
           
this.configurator.getOldFiles().add(this.fs.makeQualified(oldFilePath).toString());
           //Write to a new path, no need to delete the old path
         } else {
@@ -167,6 +173,10 @@ public class CompactionCompleteFileOperationAction 
implements CompactionComplete
         compactionState.setProp(DUPLICATE_COUNT_TOTAL + 
Long.toString(executionCount),
             compactionState.getProp(DUPLICATE_COUNT_TOTAL, "null"));
       }
+      if 
(state.getPropAsBoolean(ConfigurationKeys.GOBBLIN_METADATA_CHANGE_EVENT_ENABLED,
 false)) {
+        //GMCE enabled, set the key to be false to indicate that GMCE has not 
been sent yet
+        
compactionState.setProp(CompactionGMCEPublishingAction.GMCE_EMITTED_KEY, false);
+      }
       compactionState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, 
Long.toString(newTotalRecords));
       compactionState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, 
Long.toString(executionCount + 1));
       compactionState.setProp(CompactionSlaEventHelper.MR_JOB_ID,
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionGMCEPublishingAction.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionGMCEPublishingAction.java
index ad555c8..cc93afa 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionGMCEPublishingAction.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionGMCEPublishingAction.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
+import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
@@ -57,32 +58,42 @@ public class CompactionGMCEPublishingAction implements 
CompactionCompleteAction<
 
   public static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id";
   public static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required";
+  public final static String GMCE_EMITTED_KEY = "GMCE.emitted";
   private final State state;
   private final CompactionJobConfigurator configurator;
   private final Configuration conf;
+  private InputRecordCountHelper helper;
   private EventSubmitter eventSubmitter;
 
-  public CompactionGMCEPublishingAction(State state, CompactionJobConfigurator 
configurator) {
+  public CompactionGMCEPublishingAction(State state, CompactionJobConfigurator 
configurator, InputRecordCountHelper helper) {
     if (!(state instanceof WorkUnitState)) {
       throw new UnsupportedOperationException(this.getClass().getName() + " 
only supports workunit state");
     }
     this.state = state;
     this.configurator = configurator;
     this.conf = HadoopUtils.getConfFromState(state);
+    this.helper = helper;
+  }
+
+  public CompactionGMCEPublishingAction(State state, CompactionJobConfigurator 
configurator) {
+    this(state, configurator, new InputRecordCountHelper(state));
   }
 
   public void onCompactionJobComplete(FileSystemDataset dataset) throws 
IOException {
     if (dataset.isVirtual()) {
       return;
     }
+    CompactionPathParser.CompactionParserResult result = new 
CompactionPathParser(state).parse(dataset);
+    String datasetDir = Joiner.on("/").join(result.getDstBaseDir(), 
result.getDatasetName());
+    state.setProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR, datasetDir);
     try (GobblinMCEProducer producer = 
GobblinMCEProducer.getGobblinMCEProducer(state)) {
-
-      CompactionPathParser.CompactionParserResult result = new 
CompactionPathParser(state).parse(dataset);
-      String datasetDir = Joiner.on("/").join(result.getDstBaseDir(), 
result.getDatasetName());
-      state.setProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR, datasetDir);
       producer.sendGMCE(getNewFileMetrics(result), null, 
Lists.newArrayList(this.configurator.getOldFiles()), null,
           OperationType.rewrite_files, SchemaSource.NONE);
     }
+    State compactionState = helper.loadState(new 
Path(result.getDstAbsoluteDir()));
+    //Set the prop to be true to indicate that gmce has been emitted
+    compactionState.setProp(GMCE_EMITTED_KEY, true);
+    helper.saveState(new Path(result.getDstAbsoluteDir()), compactionState);
     //clear old files to release memory
     this.configurator.getOldFiles().clear();
   }
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseWithConfigurableCompleteAction.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseWithConfigurableCompleteAction.java
index c663421..fc282f1 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseWithConfigurableCompleteAction.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseWithConfigurableCompleteAction.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.gobblin.compaction.action.CompactionCompleteAction;
+import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -54,7 +55,7 @@ public class 
CompactionSuiteBaseWithConfigurableCompleteAction extends Compactio
     try {
       for (String s : state.getPropAsList(COMPACTION_COMPLETE_ACTIONS)) {
         
compactionCompleteActionsList.add((CompactionCompleteAction<FileSystemDataset>) 
GobblinConstructorUtils.invokeLongestConstructor(
-            Class.forName(s), state, getConfigurator()));
+            Class.forName(s), state, getConfigurator(), new 
InputRecordCountHelper(state)));
       }
     } catch (ReflectiveOperationException e) {
       throw new IOException(e);
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
index 5154afd..96f638e 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
@@ -17,22 +17,20 @@
 
 package org.apache.gobblin.compaction.verify;
 
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.Map;
-
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.collect.Lists;
-
-import lombok.extern.slf4j.Slf4j;
-
+import org.apache.gobblin.compaction.action.CompactionGMCEPublishingAction;
 import 
org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnRatio;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.hadoop.fs.Path;
 
 
 /**
@@ -60,7 +58,8 @@ public class CompactionThresholdVerifier implements 
CompactionVerifier<FileSyste
    * dataset. To avoid scalability issue, we choose a stateless approach where 
each dataset tracks
    * record count by themselves and persist it in the file system)
    *
-   * @return true iff the difference exceeds the threshold or this is the 
first time compaction
+   * @return true if the difference exceeds the threshold or this is the first 
time compaction or
+   * GMCE is enabled but last run there is something wrong when emitting GMCE
    */
   public Result verify(FileSystemDataset dataset) {
 
@@ -81,10 +80,14 @@ public class CompactionThresholdVerifier implements 
CompactionVerifier<FileSyste
         newRecords = helper.calculateRecordCount(Lists.newArrayList(new 
Path(dataset.datasetURN())));
       }
       double oldRecords = helper.readRecordCount(new 
Path(result.getDstAbsoluteDir()));
-
+      State datasetState = helper.loadState(new 
Path(result.getDstAbsoluteDir()));
       if (oldRecords == 0) {
         return new Result(true, "");
       }
+      if 
(state.getPropAsBoolean(ConfigurationKeys.GOBBLIN_METADATA_CHANGE_EVENT_ENABLED,
 false)
+          && 
!datasetState.getPropAsBoolean(CompactionGMCEPublishingAction.GMCE_EMITTED_KEY, 
true)) {
+        return new Result(true, "GMCE has not sent, need re-compact");
+      }
       if (newRecords < oldRecords) {
         return new Result(false, "Illegal state: Current records count should 
old be smaller.");
       }
@@ -94,8 +97,8 @@ public class CompactionThresholdVerifier implements 
CompactionVerifier<FileSyste
         return new Result(true, "");
       }
 
-      return new Result(false, String
-          .format("%s is failed for dataset %s. Prev=%f, Cur=%f, not reaching 
to threshold %f", this.getName(),
+      return new Result(false,
+          String.format("%s is failed for dataset %s. Prev=%f, Cur=%f, not 
reaching to threshold %f", this.getName(),
               result.getDatasetName(), oldRecords, newRecords, threshold));
     } catch (IOException e) {
       return new Result(false, ExceptionUtils.getFullStackTrace(e));

Reply via email to