[
https://issues.apache.org/jira/browse/GOBBLIN-1419?focusedWorklogId=582700&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-582700
]
ASF GitHub Bot logged work on GOBBLIN-1419:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 14/Apr/21 16:54
Start Date: 14/Apr/21 16:54
Worklog Time Spent: 10m
Work Description: autumnust commented on a change in pull request #3255:
URL: https://github.com/apache/gobblin/pull/3255#discussion_r613395422
##########
File path:
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
##########
@@ -81,10 +84,15 @@ public Result verify(FileSystemDataset dataset) {
newRecords = helper.calculateRecordCount(Lists.newArrayList(new
Path(dataset.datasetURN())));
}
double oldRecords = helper.readRecordCount(new
Path(result.getDstAbsoluteDir()));
-
+ State datasetState = helper.loadState(new
Path(result.getDstAbsoluteDir()));
if (oldRecords == 0) {
return new Result(true, "");
}
+
if(state.getPropAsBoolean(ConfigurationKeys.GOBBLIN_METADATA_CHANGE_EVENT_ENABLED,
false)) {
Review comment:
these two branches could be merged.
##########
File path:
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
##########
@@ -60,7 +62,8 @@ public CompactionThresholdVerifier(State state) {
* dataset. To avoid scalability issue, we choose a stateless approach where
each dataset tracks
* record count by themselves and persist it in the file system)
*
- * @return true iff the difference exceeds the threshold or this is the
first time compaction
+ * @return true if the difference exceeds the threshold or this is the first
time compaction or
Review comment:
I am not sure if it is a good idea to package the logic of checking
gmce-emission inside `thresholder` verifier. What's the cost of having a
separate implementation of verifier ?
##########
File path:
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionGMCEPublishingAction.java
##########
@@ -57,9 +58,11 @@
public static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id";
public static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required";
+ public final static String GMCE_EMITTED_KEY = "GMCE.emitted";
private final State state;
private final CompactionJobConfigurator configurator;
private final Configuration conf;
+ private InputRecordCountHelper helper;
Review comment:
This class may deserve a better name given the fact that it is not only
concerning for recordCount anymore.
##########
File path:
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionGMCEPublishingAction.java
##########
@@ -57,9 +58,11 @@
public static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id";
public static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required";
+ public final static String GMCE_EMITTED_KEY = "GMCE.emitted";
private final State state;
private final CompactionJobConfigurator configurator;
private final Configuration conf;
+ private InputRecordCountHelper helper;
Review comment:
Second comment for this: Considering the wide usage of
`InputRecordCountHelper` among different implementation of CompactionAction,
does it make sense to share an instance of that instead of instantiating the
same object in different implementations?
##########
File path:
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
##########
@@ -167,6 +167,10 @@ public void onCompactionJobComplete(FileSystemDataset
dataset) throws IOExceptio
compactionState.setProp(DUPLICATE_COUNT_TOTAL +
Long.toString(executionCount),
compactionState.getProp(DUPLICATE_COUNT_TOTAL, "null"));
}
+
if(state.getPropAsBoolean(ConfigurationKeys.GOBBLIN_METADATA_CHANGE_EVENT_ENABLED,
false)) {
Review comment:
please consider using auto-formatting to clean the spaces between
preserved keywords
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 582700)
Time Spent: 0.5h (was: 20m)
> Error handling for compaction pipeline on GMCE emitted error
> ------------------------------------------------------------
>
> Key: GOBBLIN-1419
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1419
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Zihan Li
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> No compaction publish file and emit GMCE are happening separately. If file
> rename succeed but emit GMCE fails, we will end up with not emit GMCE for the
> compaction. So need a strategy to do error handling for compaction that when
> emit GMCE fails, next job will treat the previous compaction as fail and
> re-compact the data.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)