zhangyue19921010 commented on code in PR #13530:
URL: https://github.com/apache/hudi/pull/13530#discussion_r2194484996
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java:
##########
@@ -207,31 +207,33 @@ private void initCheckpointId(int attemptId, long
restoredCheckpointId) throws E
this.checkpointId = restoredCheckpointId;
}
- private void sendBootstrapEvent(int attemptId, boolean isRestored) throws
Exception {
- if (attemptId <= 0) {
- if (isRestored) {
- HoodieTimeline pendingTimeline =
this.metaClient.getActiveTimeline().filterPendingExcludingCompaction();
- // if the task is initially started, resend the pending event.
- for (WriteMetadataEvent event : this.writeMetadataState.get()) {
- // Must filter out the completed instants in case it is a partial
failover,
- // the write status should not be accumulated in such case.
- if (pendingTimeline.containsInstant(event.getInstantTime())) {
- // Reset taskID for event
- event.setTaskID(taskID);
- // The checkpoint succeed but the meta does not commit,
- // re-commit the inflight instant
- this.eventGateway.sendEventToCoordinator(event);
- LOG.info("Send uncommitted write metadata event to coordinator,
task[{}].", taskID);
- }
- }
- }
- } else {
- // otherwise sends an empty bootstrap event instead.
+ private void sendBootstrapEvent(boolean isRestored) throws Exception {
+ if (!isRestored || !sendPendingCommitEvents()) {
this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID,
checkpointId));
LOG.info("Send bootstrap write metadata event to coordinator,
task[{}].", taskID);
}
}
+ private boolean sendPendingCommitEvents() throws Exception {
+ boolean eventSent = false;
Review Comment:
Hi Danny,
Below are the specific scenarios of data loss encountered during the
long-running test:
Background: HDFS experienced instability for one hour, leading to frequent
write timeouts.
Step 1: For Commit 1, after the Flink job completed the snapshot, the JM
(JobManager) encountered a timeout exception while committing data via
notifyCheckpointComplete.
Step 2: The task automatically restarted and relaunched. It began writing
data based on Commit 2. Since the attempt count was > 0, no recommit operation
was performed.
Step 3: For Commit 2, after the Flink job completed the snapshot, the JM
hung while committing data via notifyCheckpointComplete (due to HDFS
instability).
Step 4: Manual intervention occurred at this point, restarting the job from
the checkpoint.
In this case, Commit 2 was successfully recommitted, but data from Commit 1
was lost.
It seems there’s no harm in resending it once.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]