C0urante commented on code in PR #13208: URL: https://github.com/apache/kafka/pull/13208#discussion_r1106250177
########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java: ########## @@ -98,6 +104,24 @@ private boolean flushing() { return toFlush != null; } + public boolean waitForBeginFlush(Supplier<Long> timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException { Review Comment: > I don't think that this PR makes double commits possible where they weren't before. So the issue with double commits in non-EOS mode is that, right now, we may throw an exception because of the bug that we're addressing here. But if we fix that exception, then double commits become possible. And if the first commit takes a while, then we might end up lagging too much and performing our second commit after a new instance of the same source task has been brought up. > WDYT about adding the EOS-style cancellation semantics to the final commit, or closing the OffsetBackingStore in cancel() to address these cases? Do you think that we can explore those changes in a follow-up PR? I think adding the EOS-style cancellation semantics would be okay for now, though they aren't as effective for this kind of task since we don't have a way of fencing out producers. We can do that part in this PR, and then file a Jira ticket to improve cancellation logic for source task offset commit, which we can explore at a later point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org