This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new ca674d9e17 KAFKA-14079 - Ack failed records in WorkerSourceTask when 
error tolerance is ALL (#12412)
ca674d9e17 is described below

commit ca674d9e17e249ddc422b54449f847332dc03e97
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Mon Jul 18 18:06:45 2022 -0400

    KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance 
is ALL (#12412)
    
    Make sure to ack all records where produce failed, when a connector's 
`errors.tolerance` config property is set to `all`. Acking is essential so that 
the task will continue to commit future record offsets properly and remove the 
records from internal tracking, preventing a memory leak.
    
    Reviewers: Chris Egerton <[email protected]>, Randall Hauch 
<[email protected]>
---
 .../kafka/connect/runtime/WorkerSourceTask.java     |  4 ++++
 .../kafka/connect/runtime/WorkerSourceTaskTest.java | 21 ++++++++++++++++++---
 2 files changed, 22 insertions(+), 3 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 9ce2b8dbb8..a3d9b036c2 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -375,6 +375,10 @@ class WorkerSourceTask extends WorkerTask {
                                 // executeFailed here allows the use of 
existing logging infrastructure/configuration
                                 
retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, 
WorkerSourceTask.class,
                                         preTransformRecord, e);
+
+                                //Ack the record so it will be skipped and 
offsets are committed
+                                submittedRecord.ack();
+                                counter.skipRecord();
                                 commitTaskRecord(preTransformRecord, null);
                             } else {
                                 log.error("{} failed to send record to {}: ", 
WorkerSourceTask.this, topic, e);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 78db83c7ee..41df088c2a 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -835,25 +835,40 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         createWorkerTaskWithErrorToleration();
         expectTopicCreation(TOPIC);
 
+        //Use different offsets for each record so we can verify all were 
committed
+        final Map<String, Object> offset2 = Collections.singletonMap("key", 
13);
+
         // send two records
         // record 1 will succeed
         // record 2 will invoke the producer's failure callback, but ignore 
the exception via retryOperator
         // and no ConnectException will be thrown
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
-        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
-
+        SourceRecord record2 = new SourceRecord(PARTITION, offset2, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
+        expectOffsetFlush(true);
         expectSendRecordOnce();
         expectSendRecordProducerCallbackFail();
         sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), 
EasyMock.isNull());
-        EasyMock.expectLastCall();
+
+        //As of KAFKA-14079 all offsets should be committed, even for failed 
records (if ignored)
+        //Only the last offset will be passed to the method as everything up 
to that point is committed
+        //Before KAFKA-14079 offset 12 would have been passed and not 13 as it 
would have been unacked
+        offsetWriter.offset(PARTITION, offset2);
+        PowerMock.expectLastCall();
 
         PowerMock.replayAll();
 
+        //Send records and then commit offsets and verify both were committed 
and no exception
         Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
         Whitebox.invokeMethod(workerTask, "sendRecords");
+        Whitebox.invokeMethod(workerTask, "updateCommittableOffsets");
+        workerTask.commitOffsets();
 
         PowerMock.verifyAll();
+
+        //Double check to make sure all submitted records were cleared
+        assertEquals(0, ((SubmittedRecords) 
Whitebox.getInternalState(workerTask,
+                "submittedRecords")).records.size());
     }
 
     @Test

Reply via email to