jihoonson opened a new issue #7575: Checkpointing failure after taskDuration in Kafka/Kinesis indexing service URL: https://github.com/apache/incubator-druid/issues/7575 ### Affected Version All versions since incremental handoff was introduced ### Description Checkpointing can be initiated by both the supervisor and tasks. Tasks can initiate checkpointing whenever it wants to publish segments. The supervisor initiates checkpointing when the task run time has reached to `taskDuration`. When the supervisor initiates checkpointing, the task changes its status to `publishing` and will stop once it publishes all segments. The supervisor calls `checkTaskDuration()` to start checkpointing (https://github.com/apache/incubator-druid/blob/master/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L1912-L1969). Here is some code snippet. ```java private void checkTaskDuration() throws ExecutionException, InterruptedException, TimeoutException { final List<ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>> futures = new ArrayList<>(); ... for (Entry<Integer, TaskGroup> entry : activelyReadingTaskGroups.entrySet()) { ... if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) { log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration()); futureGroupIds.add(groupId); futures.add(checkpointTaskGroup(group, true)); } } List<Map<PartitionIdType, SequenceOffsetType>> results = Futures.successfulAsList(futures) .get(futureTimeoutInSeconds, TimeUnit.SECONDS); for (int j = 0; j < results.size(); j++) { ... activelyReadingTaskGroups.remove(groupId); } } ``` The issue is `checkpointTaskGroup(group, true)` can be called more than one time for the same taskGroup if `Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS)` fails because of timeout. If it is timed out, some future might fail, but others might succeed. However, `activelyReadingTaskGroups` is updated only when `futures.get()` is returned successfully. As a result, when `checkTaskDuration` is called in the next runNotice, it can start duplicate checkpointing for some taskGroups because they are still in `activelyReadingTaskGroups` which results in failing all tasks in those taskGroups because the previous checkpointing succeeded and they are now in `publishing` status.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
