danny0405 commented on code in PR #13285:
URL: https://github.com/apache/hudi/pull/13285#discussion_r2086661912
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -501,44 +503,22 @@ private void handleWriteMetaEvent(WriteMetadataEvent
event) {
addEventToBuffer(event);
}
- /**
- * The coordinator reuses the instant if there is no data for this round of
checkpoint,
- * sends the commit ack events to unblock the flushing.
- */
- private void sendCommitAckEvents(long checkpointId) {
- CompletableFuture<?>[] futures =
Arrays.stream(this.gateways).filter(Objects::nonNull)
- .map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId)))
- .toArray(CompletableFuture<?>[]::new);
- CompletableFuture.allOf(futures).whenComplete((resp, error) -> {
- if (!sendToFinishedTasks(error)) {
- throw new HoodieException("Error while waiting for the commit ack
events to finish sending", error);
- }
- });
- }
-
- /**
- * Decides whether the given exception is caused by sending events to
FINISHED tasks.
- *
- * <p>Ugly impl: the exception may change in the future.
- */
- private static boolean sendToFinishedTasks(Throwable throwable) {
- return throwable.getCause() instanceof TaskNotRunningException
- || throwable.getCause().getMessage().contains("running");
- }
-
/**
* Commits the instant.
*/
- private boolean commitInstant(String instant) {
- return commitInstant(instant, -1);
+ private boolean commitInstants(long checkpointId) {
+ // use < instead of <= because the write metadata event sends the last
known checkpoint id which is smaller than the current one.
+ List<Boolean> result = this.eventBuffers.entrySet().stream().filter(entry
-> entry.getKey() < checkpointId)
+ .map(entry -> commitInstant(entry.getKey(),
entry.getValue().getLeft(),
entry.getValue().getRight())).collect(Collectors.toList());
Review Comment:
See the cleaing of bootstrap event and the strategy for commiting empty
instant.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]