wxplovecc commented on a change in pull request #4654:
URL: https://github.com/apache/hudi/pull/4654#discussion_r789373079



##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
##########
@@ -151,11 +151,12 @@ protected void preLoadIndexRecords() throws Exception {
    */
   private void waitForBootstrapReady(int taskID) {
     int taskNum = getRuntimeContext().getNumberOfParallelSubtasks();
+    int attemptNum = getRuntimeContext().getAttemptNumber();
     int readyTaskNum = 1;
     while (taskNum != readyTaskNum) {
       try {
-        readyTaskNum = 
aggregateManager.updateGlobalAggregate(BootstrapAggFunction.NAME, taskID, new 
BootstrapAggFunction());
-        LOG.info("Waiting for other bootstrap tasks to complete, taskId = 
{}.", taskID);
+        readyTaskNum = 
aggregateManager.updateGlobalAggregate(BootstrapAggFunction.NAME + "_" + 
attemptNum, taskID, new BootstrapAggFunction());
+        LOG.info("Waiting for other bootstrap tasks to complete, taskId = {}, 
attemptNum = {}.", taskID, attemptNum);

Review comment:
       Ok,once flink job with index.bootstrap=true failed like taskmanager lost
   if the job restart with the same GlobalAggregate name, it will reuse the 
`accumulators` in JobMaster
   and then, some parallelism of BootstrapOperator that faster then others will 
send records downstream
   without wait for all bootstrap task done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to