C0urante commented on code in PR #12429:
URL: https://github.com/apache/kafka/pull/12429#discussion_r929381491


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##########
@@ -217,6 +217,9 @@ protected void finalOffsetCommit(boolean failed) {
         if (failed) {
             log.debug("Skipping final offset commit as task has failed");
             return;
+        } else if (isCancelled()) {
+            log.debug("Skipping final offset commit as task has been cancelled 
and its producer has already been closed");

Review Comment:
   That's fair; technically, it may not be closed if we hit this part right 
after `cancelled` is flipped to `true` but not before the producer is actually 
closed. I'd personally err on the side of skipping the offset commit if the 
task is cancelled since it's scary to see offset commit failure messages and 
the odds of an offset commit being viable once we know the task has been 
cancelled are low, but I'm not too attached to this (especially on a flaky test 
PR). What are your thoughts?
   
   FWIW, I've also updated the log message to remove the explanation of why 
we're skipping the commit; the detail about the producer being closed doesn't 
seem strictly necessary.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##########
@@ -768,27 +767,32 @@ public void testSeparateOffsetsTopic() throws Exception {
                     recordNum >= recordsProduced);
 
             // also consume from the connector's dedicated offsets topic; just 
need to read one offset record

Review Comment:
   Nope, thanks for the catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to