yihua commented on code in PR #9275:
URL: https://github.com/apache/hudi/pull/9275#discussion_r1328255361
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -294,31 +291,9 @@ protected void
setCommitMetadata(HoodieWriteMetadata<HoodieData<WriteStatus>> re
}
@Override
- protected void commit(Option<Map<String, String>> extraMetadata,
HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
+ protected void commit(HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
context.setJobStatus(this.getClass().getSimpleName(), "Commit write status
collect: " + config.getTableName());
- String actionType = getCommitActionType();
- LOG.info("Committing " + instantTime + ", action Type " + actionType + ",
operation Type " + operationType);
- result.setCommitted(true);
- if (!result.getWriteStats().isPresent()) {
-
result.setWriteStats(result.getWriteStatuses().map(WriteStatus::getStat).collectAsList());
- }
- // Finalize write
- finalizeWrite(instantTime, result.getWriteStats().get(), result);
- try {
- HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
- HoodieCommitMetadata metadata = result.getCommitMetadata().get();
- writeTableMetadata(metadata, result.getWriteStatuses(), actionType);
- // cannot serialize maps with null values
- metadata.getExtraMetadata().entrySet().removeIf(entry ->
entry.getValue() == null);
- activeTimeline.saveAsComplete(
- new HoodieInstant(true, getCommitActionType(), instantTime),
- serializeCommitMetadata(metadata));
- LOG.info("Committed " + instantTime);
- result.setCommitMetadata(Option.of(metadata));
- } catch (IOException e) {
- throw new HoodieCommitException("Failed to complete commit " +
config.getBasePath() + " at time " + instantTime,
- e);
- }
+ commit(result.getWriteStatuses(), result,
result.getWriteStatuses().map(WriteStatus::getStat).collectAsList());
Review Comment:
The last argument of `commit`,
`result.getWriteStatuses().map(WriteStatus::getStat).collectAsList()`, should
be below to be strictly the same as before.
```
result.getWriteStats().isPresent() ? result.getWriteStats().get() :
result.getWriteStatuses().map(WriteStatus::getStat).collectAsList()
```
In Spark, the write statuses are in RDD and can trigger the DAG again.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]