C0urante commented on code in PR #12920:
URL: https://github.com/apache/kafka/pull/12920#discussion_r1035151360


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##########
@@ -282,16 +282,18 @@ private void commitTransaction() {
 
         // Commit the transaction
         // Blocks until all outstanding records have been sent and ack'd
-        try {
-            producer.commitTransaction();
-        } catch (Throwable t) {
-            log.error("{} Failed to commit producer transaction", 
ExactlyOnceWorkerSourceTask.this, t);
-            flushError.compareAndSet(null, t);
+        Throwable error = flushError.get();
+        if (error == null) {
+            try {
+                producer.commitTransaction();
+            } catch (Throwable t) {
+                log.error("{} Failed to commit producer transaction", 
ExactlyOnceWorkerSourceTask.this, t);
+                flushError.compareAndSet(null, t);
+            }
         }
-
         transactionOpen = false;

Review Comment:
   Nit: can/should we move `transactionOpen = false` into the `if (error == 
null)` block above, after the call to `producer.commitTransaction()`? It 
shouldn't have much effect right now but that may change if we decide to add 
error tolerance for offset commits with exactly-once support enabled.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##########
@@ -282,16 +282,18 @@ private void commitTransaction() {
 
         // Commit the transaction
         // Blocks until all outstanding records have been sent and ack'd

Review Comment:
   This should probably be moved above the call to `producer::commitTransaction`



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java:
##########
@@ -687,7 +687,26 @@ public void testCommitFlushCallbackFailure() throws 
Exception {
             callback.onCompletion(failure, null);
             return null;
         });
-        testCommitFailure(failure);
+        testCommitFailure(failure, false);
+    }
+
+    @Test
+    public void testCommitFlushAsyncCallbackFailure() throws Exception {

Review Comment:
   Should we rename `testCommitFlushCallbackFailure` to 
`testCommitFlushSyncCallbackFailure` to match this?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java:
##########
@@ -687,7 +687,26 @@ public void testCommitFlushCallbackFailure() throws 
Exception {
             callback.onCompletion(failure, null);
             return null;
         });
-        testCommitFailure(failure);
+        testCommitFailure(failure, false);
+    }
+
+    @Test
+    public void testCommitFlushAsyncCallbackFailure() throws Exception {
+        Exception failure = new RecordTooLargeException();
+        when(offsetWriter.willFlush()).thenReturn(true);
+        when(offsetWriter.beginFlush()).thenReturn(true);
+        // doFlush delegates it's callback to the producer,
+        // which delays completing the callback until commitTransaction
+        AtomicReference<Callback<Void>> callback = new AtomicReference<>();
+        when(offsetWriter.doFlush(any())).thenAnswer(invocation -> {
+            callback.set(invocation.getArgument(0));
+            return null;
+        });
+        doAnswer(invocation -> {
+            callback.get().onCompletion(failure, null);
+            throw failure;

Review Comment:
   We already have test coverage for the case where 
`producer::commitTransaction` throws an exception. Just to be sure that we're 
testing the offset flush callback instead of the try/catch around the 
transaction commit, we shouldn't throw anything from here.
   
   ```suggestion
               return null;
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java:
##########
@@ -687,7 +687,26 @@ public void testCommitFlushCallbackFailure() throws 
Exception {
             callback.onCompletion(failure, null);
             return null;
         });
-        testCommitFailure(failure);
+        testCommitFailure(failure, false);
+    }
+
+    @Test
+    public void testCommitFlushAsyncCallbackFailure() throws Exception {
+        Exception failure = new RecordTooLargeException();
+        when(offsetWriter.willFlush()).thenReturn(true);
+        when(offsetWriter.beginFlush()).thenReturn(true);
+        // doFlush delegates it's callback to the producer,

Review Comment:
   Nit:
   ```suggestion
           // doFlush delegates its callback to the producer,
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##########
@@ -282,16 +282,18 @@ private void commitTransaction() {
 
         // Commit the transaction
         // Blocks until all outstanding records have been sent and ack'd
-        try {
-            producer.commitTransaction();
-        } catch (Throwable t) {
-            log.error("{} Failed to commit producer transaction", 
ExactlyOnceWorkerSourceTask.this, t);
-            flushError.compareAndSet(null, t);
+        Throwable error = flushError.get();

Review Comment:
   ```suggestion
           Throwable error = flushError.get();
           // Only commit the transaction if we were able to serialize the 
offsets; otherwise, we may commit source records without committing their 
offsets
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to