This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8142822633 KAFKA-14079 - Ack failed records in WorkerSourceTask when
error tolerance is ALL (#12415)
8142822633 is described below
commit 81428226332005c27870aacfccc813950c84386c
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Mon Jul 18 18:07:20 2022 -0400
KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance
is ALL (#12415)
Make sure to ack all records where produce failed, when a connector's
`errors.tolerance` config property is set to `all`. Acking is essential so that
the task will continue to commit future record offsets properly and remove the
records from internal tracking, preventing a memory leak.
(cherry picked and slightly modified from commit
63e06aafd0cf37f8488c3830946051b3a30db2a0)
Reviewers: Chris Egerton <[email protected]>, Randall Hauch
<[email protected]>
---
.../connect/runtime/AbstractWorkerSourceTask.java | 5 +++++
.../kafka/connect/runtime/WorkerSourceTaskTest.java | 21 ++++++++++++++++++---
2 files changed, 23 insertions(+), 3 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index d89f577688..407f5fd828 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -38,6 +38,7 @@ import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
+import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
@@ -406,6 +407,10 @@ public abstract class AbstractWorkerSourceTask extends
WorkerTask {
}
log.trace("{} Failed record: {}",
AbstractWorkerSourceTask.this, preTransformRecord);
producerSendFailed(false, producerRecord,
preTransformRecord, e);
+ if
(retryWithToleranceOperator.getErrorToleranceType() == ToleranceType.ALL) {
+ counter.skipRecord();
+
submittedRecord.ifPresent(SubmittedRecords.SubmittedRecord::ack);
+ }
} else {
counter.completeRecord();
log.trace("{} Wrote record successfully: topic {}
partition {} offset {}",
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 5ce0e44f3e..2d2cd00cf5 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -696,24 +696,39 @@ public class WorkerSourceTaskTest extends ThreadedTest {
createWorkerTaskWithErrorToleration();
expectTopicCreation(TOPIC);
+ //Use different offsets for each record so we can verify all were
committed
+ final Map<String, Object> offset2 = Collections.singletonMap("key",
13);
+
// send two records
// record 1 will succeed
// record 2 will invoke the producer's failure callback, but ignore
the exception via retryOperator
// and no ConnectException will be thrown
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
- SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
-
+ SourceRecord record2 = new SourceRecord(PARTITION, offset2, TOPIC, 2,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ expectOffsetFlush(true);
expectSendRecordOnce();
expectSendRecordProducerCallbackFail();
sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class),
EasyMock.isNull());
- EasyMock.expectLastCall();
+
+ //As of KAFKA-14079 all offsets should be committed, even for failed
records (if ignored)
+ //Only the last offset will be passed to the method as everything up
to that point is committed
+ //Before KAFKA-14079 offset 12 would have been passed and not 13 as it
would have been unacked
+ offsetWriter.offset(PARTITION, offset2);
+ PowerMock.expectLastCall();
PowerMock.replayAll();
+ //Send records and then commit offsets and verify both were committed
and no exception
Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1,
record2));
Whitebox.invokeMethod(workerTask, "sendRecords");
+ Whitebox.invokeMethod(workerTask, "updateCommittableOffsets");
+ workerTask.commitOffsets();
PowerMock.verifyAll();
+
+ //Double check to make sure all submitted records were cleared
+ assertEquals(0, ((SubmittedRecords)
Whitebox.getInternalState(workerTask,
+ "submittedRecords")).records.size());
}
@Test