gharris1727 commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1103447933


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java:
##########
@@ -98,6 +104,24 @@ private boolean flushing() {
         return toFlush != null;
     }
 
+    public boolean waitForBeginFlush(Supplier<Long> timeout, TimeUnit 
timeUnit) throws InterruptedException, TimeoutException {

Review Comment:
   > Given that, do we really need a separate method here, or can we relax the 
constraints in beginFlush to wait for in-progress flushes to conclude instead 
of throwing an exception if there are any?
   
   The reason I implemented it as two separate methods was to minimize the 
disturbance to other call-sites, specifically the ExactlyOnceSourceTask. 
Because this introduces a new way to fail due to a timeout, and the 
ExactlyOnceSourceTask doesn't respect the flush timeout, I thought that it 
might not be a desirable change where the assertion does no harm.
   
   Upon reflection, I think diverging the usage patterns of the 
OffsetStorageWriter in these two different contexts is more error-prone than 
migrating both of them to the new semantics, so I'll update the PR to 
incorporate your feedback.
   
   > Would a Semaphore or CountDownLatch be more suited?
   
   Thanks, that makes a lot more sense. I was originally trying to re-use the 
doFlush future, but since `flushing()` starts in `beginFlush`, i needed a 
synchronizer that i created in beginFlush.
   
   > Finally--since this change may lead to us performing double offset commits 
when a task is being shut down, do you think it might also make sense to add a 
close method to the offset writer that throws an exception for any further 
attempts to flush, and possibly forcibly terminates any in-progress flushes? We 
can invoke that in AbstractWorkerTask::cancel (or possibly 
WorkerSourceTask::cancel if a different approach is necessary to preserve 
exactly-once semantics) to help tasks complete shutdown within the timeout 
allotted to them.
   
   In EOS mode, if the task is cancelled before it comes time to perform the 
final offset commit, the final offset commit is skipped. This does not change 
in this PR, because the ExactlyOnceSourceTask should never leave a flush open 
and have to wait for a previous flush to finish.
   
   In non-EOS mode, the check of the cancelled flag isn't present. It appears 
that there are already some wait conditions (in 
WorkerSourceTask::finalOffsetCommit) to maximize the number of records included 
in the flush, and that wait condition may cause the thread to commit offsets 
after cancellation. I don't think that this PR makes double commits possible 
where they weren't before.
   
   WDYT about adding the EOS-style cancellation semantics to the final commit, 
or closing the OffsetBackingStore in cancel() to address these cases? Do you 
think that we can explore those changes in a follow-up PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to