guozhangwang commented on code in PR #12337:
URL: https://github.com/apache/kafka/pull/12337#discussion_r912542741


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -697,7 +693,7 @@ private Map<TopicPartition, Long> 
committedOffsetForChangelogs(final Map<TaskId,
                                                                    final 
Set<TopicPartition> partitions) {
         final Map<TopicPartition, Long> committedOffsets;
         try {
-            committedOffsets = fetchCommittedOffsets(partitions, mainConsumer);

Review Comment:
   Hi @cadonna I spent some time trying out the idea, but it did not work very 
elegantly. The main reason is that we have both `committedOffsetForChangelogs` 
and `endOffsetForChangelogs` which may throw `ExecutionException` / 
`TimeoutException` and `InterruptedException`. And if we want to let the 
`InterruptedException` to rethrow, like you said we need to let the caller to 
handle that, but the current logic is that we would swallow and record the task 
idleness, so to be compatible we'd still need to do so, which means that we 
would end up doing similar things for the stream thread.
   
   So I suggest we do what you suggested after we completed moving the store 
changelog reader to the restore thread, and then change the semantics as 
"record task idleness for retriable execution and timeout exception, but 
re-throw for interrupted exception". WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to