This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a77250b [GOBBLIN-1419] Error handling for compaction pipeline on GMCE
emitted error
a77250b is described below
commit a77250b057e1ba83531ee0bed6e8302ed6c18510
Author: Zihan Li <[email protected]>
AuthorDate: Fri Apr 23 11:13:46 2021 -0700
[GOBBLIN-1419] Error handling for compaction pipeline on GMCE emitted error
[GOBBLIN-1419]Error handling for compaction
pipeline on GMCE emitted error
address comments
Closes #3255 from ZihanLi58/GOBBLIN-1419
---
.../gobblin/configuration/ConfigurationKeys.java | 1 +
.../CompactionCompleteFileOperationAction.java | 20 ++++++++++++-----
.../action/CompactionGMCEPublishingAction.java | 21 +++++++++++++-----
...ionSuiteBaseWithConfigurableCompleteAction.java | 3 ++-
.../verify/CompactionThresholdVerifier.java | 25 ++++++++++++----------
5 files changed, 48 insertions(+), 22 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 228d835..ed4ff00 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -1058,6 +1058,7 @@ public class ConfigurationKeys {
/**
* Configuration and constant vale for GobblinMetadataChangeEvent
*/
+ public static final String GOBBLIN_METADATA_CHANGE_EVENT_ENABLED =
"GobblinMetadataChangeEvent.enabled";
public static final String LIST_DELIMITER_KEY = ",";
public static final String RANGE_DELIMITER_KEY = "-";
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index b7522ac..f0f900d 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
-import static
org.apache.gobblin.compaction.event.CompactionSlaEventHelper.DUPLICATE_COUNT_TOTAL;
+import static org.apache.gobblin.compaction.event.CompactionSlaEventHelper.*;
/**
@@ -66,16 +66,20 @@ public class CompactionCompleteFileOperationAction
implements CompactionComplete
private EventSubmitter eventSubmitter;
private FileSystem fs;
- public CompactionCompleteFileOperationAction(State state,
CompactionJobConfigurator configurator) {
+ public CompactionCompleteFileOperationAction(State state,
CompactionJobConfigurator configurator, InputRecordCountHelper helper) {
if (!(state instanceof WorkUnitState)) {
throw new UnsupportedOperationException(this.getClass().getName() + "
only supports workunit state");
}
this.state = (WorkUnitState) state;
- this.helper = new InputRecordCountHelper(state);
+ this.helper = helper;
this.configurator = configurator;
this.fs = configurator.getFs();
}
+ public CompactionCompleteFileOperationAction(State state,
CompactionJobConfigurator configurator) {
+ this(state, configurator, new InputRecordCountHelper(state));
+ }
+
/**
* Replace or append the destination folder with new files from map-reduce
job
* Create a record count file containing the number of records that have
been processed .
@@ -127,8 +131,10 @@ public class CompactionCompleteFileOperationAction
implements CompactionComplete
newTotalRecords = this.configurator.getFileNameRecordCount();
} else {
if
(state.getPropAsBoolean(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER,
false)) {
- Path oldFilePath = PathUtils.mergePaths(dstPath, new
Path(String.format(COMPACTION_DIRECTORY_FORMAT, executionCount)));
- dstPath = PathUtils.mergePaths(dstPath, new
Path(String.format(COMPACTION_DIRECTORY_FORMAT, executionCount + 1)));
+ Path oldFilePath =
+ PathUtils.mergePaths(dstPath, new
Path(String.format(COMPACTION_DIRECTORY_FORMAT, executionCount)));
+ dstPath =
+ PathUtils.mergePaths(dstPath, new
Path(String.format(COMPACTION_DIRECTORY_FORMAT, executionCount + 1)));
this.configurator.getOldFiles().add(this.fs.makeQualified(oldFilePath).toString());
//Write to a new path, no need to delete the old path
} else {
@@ -167,6 +173,10 @@ public class CompactionCompleteFileOperationAction
implements CompactionComplete
compactionState.setProp(DUPLICATE_COUNT_TOTAL +
Long.toString(executionCount),
compactionState.getProp(DUPLICATE_COUNT_TOTAL, "null"));
}
+ if
(state.getPropAsBoolean(ConfigurationKeys.GOBBLIN_METADATA_CHANGE_EVENT_ENABLED,
false)) {
+ //GMCE enabled, set the key to be false to indicate that GMCE has not
been sent yet
+
compactionState.setProp(CompactionGMCEPublishingAction.GMCE_EMITTED_KEY, false);
+ }
compactionState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL,
Long.toString(newTotalRecords));
compactionState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL,
Long.toString(executionCount + 1));
compactionState.setProp(CompactionSlaEventHelper.MR_JOB_ID,
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionGMCEPublishingAction.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionGMCEPublishingAction.java
index ad555c8..cc93afa 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionGMCEPublishingAction.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionGMCEPublishingAction.java
@@ -25,6 +25,7 @@ import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
+import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
@@ -57,32 +58,42 @@ public class CompactionGMCEPublishingAction implements
CompactionCompleteAction<
public static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id";
public static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required";
+ public final static String GMCE_EMITTED_KEY = "GMCE.emitted";
private final State state;
private final CompactionJobConfigurator configurator;
private final Configuration conf;
+ private InputRecordCountHelper helper;
private EventSubmitter eventSubmitter;
- public CompactionGMCEPublishingAction(State state, CompactionJobConfigurator
configurator) {
+ public CompactionGMCEPublishingAction(State state, CompactionJobConfigurator
configurator, InputRecordCountHelper helper) {
if (!(state instanceof WorkUnitState)) {
throw new UnsupportedOperationException(this.getClass().getName() + "
only supports workunit state");
}
this.state = state;
this.configurator = configurator;
this.conf = HadoopUtils.getConfFromState(state);
+ this.helper = helper;
+ }
+
+ public CompactionGMCEPublishingAction(State state, CompactionJobConfigurator
configurator) {
+ this(state, configurator, new InputRecordCountHelper(state));
}
public void onCompactionJobComplete(FileSystemDataset dataset) throws
IOException {
if (dataset.isVirtual()) {
return;
}
+ CompactionPathParser.CompactionParserResult result = new
CompactionPathParser(state).parse(dataset);
+ String datasetDir = Joiner.on("/").join(result.getDstBaseDir(),
result.getDatasetName());
+ state.setProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR, datasetDir);
try (GobblinMCEProducer producer =
GobblinMCEProducer.getGobblinMCEProducer(state)) {
-
- CompactionPathParser.CompactionParserResult result = new
CompactionPathParser(state).parse(dataset);
- String datasetDir = Joiner.on("/").join(result.getDstBaseDir(),
result.getDatasetName());
- state.setProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR, datasetDir);
producer.sendGMCE(getNewFileMetrics(result), null,
Lists.newArrayList(this.configurator.getOldFiles()), null,
OperationType.rewrite_files, SchemaSource.NONE);
}
+ State compactionState = helper.loadState(new
Path(result.getDstAbsoluteDir()));
+ //Set the prop to be true to indicate that gmce has been emitted
+ compactionState.setProp(GMCE_EMITTED_KEY, true);
+ helper.saveState(new Path(result.getDstAbsoluteDir()), compactionState);
//clear old files to release memory
this.configurator.getOldFiles().clear();
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseWithConfigurableCompleteAction.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseWithConfigurableCompleteAction.java
index c663421..fc282f1 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseWithConfigurableCompleteAction.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseWithConfigurableCompleteAction.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.gobblin.compaction.action.CompactionCompleteAction;
+import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -54,7 +55,7 @@ public class
CompactionSuiteBaseWithConfigurableCompleteAction extends Compactio
try {
for (String s : state.getPropAsList(COMPACTION_COMPLETE_ACTIONS)) {
compactionCompleteActionsList.add((CompactionCompleteAction<FileSystemDataset>)
GobblinConstructorUtils.invokeLongestConstructor(
- Class.forName(s), state, getConfigurator()));
+ Class.forName(s), state, getConfigurator(), new
InputRecordCountHelper(state)));
}
} catch (ReflectiveOperationException e) {
throw new IOException(e);
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
index 5154afd..96f638e 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
@@ -17,22 +17,20 @@
package org.apache.gobblin.compaction.verify;
+import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.Map;
-
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.collect.Lists;
-
-import lombok.extern.slf4j.Slf4j;
-
+import org.apache.gobblin.compaction.action.CompactionGMCEPublishingAction;
import
org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnRatio;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.hadoop.fs.Path;
/**
@@ -60,7 +58,8 @@ public class CompactionThresholdVerifier implements
CompactionVerifier<FileSyste
* dataset. To avoid scalability issue, we choose a stateless approach where
each dataset tracks
* record count by themselves and persist it in the file system)
*
- * @return true iff the difference exceeds the threshold or this is the
first time compaction
+ * @return true if the difference exceeds the threshold or this is the first
time compaction or
+ * GMCE is enabled but last run there is something wrong when emitting GMCE
*/
public Result verify(FileSystemDataset dataset) {
@@ -81,10 +80,14 @@ public class CompactionThresholdVerifier implements
CompactionVerifier<FileSyste
newRecords = helper.calculateRecordCount(Lists.newArrayList(new
Path(dataset.datasetURN())));
}
double oldRecords = helper.readRecordCount(new
Path(result.getDstAbsoluteDir()));
-
+ State datasetState = helper.loadState(new
Path(result.getDstAbsoluteDir()));
if (oldRecords == 0) {
return new Result(true, "");
}
+ if
(state.getPropAsBoolean(ConfigurationKeys.GOBBLIN_METADATA_CHANGE_EVENT_ENABLED,
false)
+ &&
!datasetState.getPropAsBoolean(CompactionGMCEPublishingAction.GMCE_EMITTED_KEY,
true)) {
+ return new Result(true, "GMCE has not sent, need re-compact");
+ }
if (newRecords < oldRecords) {
return new Result(false, "Illegal state: Current records count should
old be smaller.");
}
@@ -94,8 +97,8 @@ public class CompactionThresholdVerifier implements
CompactionVerifier<FileSyste
return new Result(true, "");
}
- return new Result(false, String
- .format("%s is failed for dataset %s. Prev=%f, Cur=%f, not reaching
to threshold %f", this.getName(),
+ return new Result(false,
+ String.format("%s is failed for dataset %s. Prev=%f, Cur=%f, not
reaching to threshold %f", this.getName(),
result.getDatasetName(), oldRecords, newRecords, threshold));
} catch (IOException e) {
return new Result(false, ExceptionUtils.getFullStackTrace(e));