guozhangwang commented on code in PR #12337: URL: https://github.com/apache/kafka/pull/12337#discussion_r912542741
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -697,7 +693,7 @@ private Map<TopicPartition, Long> committedOffsetForChangelogs(final Map<TaskId, final Set<TopicPartition> partitions) { final Map<TopicPartition, Long> committedOffsets; try { - committedOffsets = fetchCommittedOffsets(partitions, mainConsumer); Review Comment: Hi @cadonna I spent some time trying out the idea, but it did not work very elegantly. The main reason is that we have both `committedOffsetForChangelogs` and `endOffsetForChangelogs` which may throw `ExecutionException` / `TimeoutException` and `InterruptedException`. And if we want to let the `InterruptedException` to rethrow, like you said we need to let the caller to handle that, but the current logic is that we would swallow and record the task idleness, so to be compatible we'd still need to do so, which means that we would end up doing similar things for the stream thread. So I suggest we do what you suggested after we completed moving the store changelog reader to the restore thread, and then change the semantics as "record task idleness for retriable execution and timeout exception, but re-throw for interrupted exception". WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org