jihoonson opened a new issue #7575: Checkpointing failure after taskDuration in 
Kafka/Kinesis indexing service
URL: https://github.com/apache/incubator-druid/issues/7575
 
 
   ### Affected Version
   
   All versions since incremental handoff was introduced
   
   ### Description
   
   Checkpointing can be initiated by both the supervisor and tasks. Tasks can 
initiate checkpointing whenever it wants to publish segments. The supervisor 
initiates checkpointing when the task run time has reached to `taskDuration`. 
When the supervisor initiates checkpointing, the task changes its status to 
`publishing` and will stop once it publishes all segments. 
   
   The supervisor calls `checkTaskDuration()` to start checkpointing 
(https://github.com/apache/incubator-druid/blob/master/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L1912-L1969).
   
   Here is some code snippet.
   
   ```java
     private void checkTaskDuration() throws ExecutionException, 
InterruptedException, TimeoutException
     {
       final List<ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>> 
futures = new ArrayList<>();
   ...
       for (Entry<Integer, TaskGroup> entry : 
activelyReadingTaskGroups.entrySet()) {
   ...
         if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
           log.info("Task group [%d] has run for [%s]", groupId, 
ioConfig.getTaskDuration());
           futureGroupIds.add(groupId);
           futures.add(checkpointTaskGroup(group, true));
         }
       }
   
       List<Map<PartitionIdType, SequenceOffsetType>> results = 
Futures.successfulAsList(futures)
                                                                       
.get(futureTimeoutInSeconds, TimeUnit.SECONDS);
       for (int j = 0; j < results.size(); j++) {
   ...
         activelyReadingTaskGroups.remove(groupId);
       }
     }
   ```
   
   The issue is `checkpointTaskGroup(group, true)` can be called more than one 
time for the same taskGroup if 
`Futures.successfulAsList(futures).get(futureTimeoutInSeconds, 
TimeUnit.SECONDS)` fails because of timeout. If it is timed out, some future 
might fail, but others might succeed. However, `activelyReadingTaskGroups` is 
updated only when `futures.get()` is returned successfully. As a result, when 
`checkTaskDuration` is called in the next runNotice, it can start duplicate 
checkpointing for some taskGroups because they are still in 
`activelyReadingTaskGroups` which results in failing all tasks in those 
taskGroups because the previous checkpointing succeeded and they are now in 
`publishing` status.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to