This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new cf8c608028a KAFKA-14799: Ignore source task requests to abort empty
transactions (#13379)
cf8c608028a is described below
commit cf8c608028aa415d7b4868cc2ce224765310eb86
Author: Chris Egerton <[email protected]>
AuthorDate: Tue Mar 14 15:10:29 2023 -0400
KAFKA-14799: Ignore source task requests to abort empty transactions
(#13379)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/connect/source/TransactionContext.java | 10 +++++++++
.../connect/runtime/AbstractWorkerSourceTask.java | 5 ++---
.../runtime/ExactlyOnceWorkerSourceTask.java | 10 ++++++---
.../runtime/ExactlyOnceWorkerSourceTaskTest.java | 24 ++++++++++------------
4 files changed, 30 insertions(+), 19 deletions(-)
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
b/connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
index f90d75baf47..4c3c00528ac 100644
---
a/connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
+++
b/connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
@@ -31,6 +31,11 @@ public interface TransactionContext {
/**
* Request a transaction commit after a source record is processed. The
source record will be the
* last record in the committed transaction.
+ * <p>
+ * If a task requests that the last record in a batch that it returns from
{@link SourceTask#poll()}
+ * be committed by invoking this method, and also requests that that same
batch be aborted by
+ * invoking {@link #abortTransaction()}, the record-based operation (in
this case, committing
+ * the transaction) will take precedence.
* @param record the record to commit the transaction after; may not be
null.
*/
void commitTransaction(SourceRecord record);
@@ -50,6 +55,11 @@ public interface TransactionContext {
* and will not appear in a committed transaction. However, offsets for
that transaction will still
* be committed so that the records in that transaction are not
reprocessed. If the data should be
* reprocessed, the task should not invoke this method and should instead
throw an exception.
+ * <p>
+ * If a task requests that the last record in a batch that it returns from
{@link SourceTask#poll()}
+ * be aborted by invoking this method, and also requests that that same
batch be committed by
+ * invoking {@link #commitTransaction()}, the record-based operation (in
this case, aborting
+ * the transaction) will take precedence.
* @param record the record to abort the transaction after; may not be
null.
*/
void abortTransaction(SourceRecord record);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index 51836a2d6a3..09d8d739eed 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -354,9 +354,7 @@ public abstract class AbstractWorkerSourceTask extends
WorkerTask {
continue;
}
log.trace("{} About to send {} records to Kafka", this,
toSend.size());
- if (sendRecords()) {
- batchDispatched();
- } else {
+ if (!sendRecords()) {
stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS,
TimeUnit.MILLISECONDS);
}
}
@@ -451,6 +449,7 @@ public abstract class AbstractWorkerSourceTask extends
WorkerTask {
recordDispatched(preTransformRecord);
}
toSend = null;
+ batchDispatched();
return true;
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index 972249d3f86..58efaadc47c 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -468,7 +468,7 @@ class ExactlyOnceWorkerSourceTask extends
AbstractWorkerSourceTask {
protected boolean shouldCommitTransactionForBatch(long
currentTimeMs) {
if (transactionContext.shouldAbortBatch()) {
log.info("Aborting transaction for batch as
requested by connector");
- abortTransaction();
+ maybeAbortTransaction();
// We abort the transaction, which causes all the
records up to this point to be dropped, but we still want to
// commit offsets so that the task doesn't see the
same records all over again
return true;
@@ -481,7 +481,7 @@ class ExactlyOnceWorkerSourceTask extends
AbstractWorkerSourceTask {
if (transactionContext.shouldAbortOn(record)) {
log.info("Aborting transaction for record on topic
{} as requested by connector", record.topic());
log.trace("Last record in aborted transaction:
{}", record);
- abortTransaction();
+ maybeAbortTransaction();
// We abort the transaction, which causes all the
records up to this point to be dropped, but we still want to
// commit offsets so that the task doesn't see the
same records all over again
return true;
@@ -489,7 +489,11 @@ class ExactlyOnceWorkerSourceTask extends
AbstractWorkerSourceTask {
return transactionContext.shouldCommitOn(record);
}
- private void abortTransaction() {
+ private void maybeAbortTransaction() {
+ if (!transactionOpen) {
+ log.warn("Ignoring request by task to abort
transaction as the current transaction is empty");
+ return;
+ }
producer.abortTransaction();
transactionMetrics.abortTransaction();
transactionOpen = false;
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index 23feadc25e5..35499a8c55c 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -17,8 +17,8 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
@@ -138,7 +138,7 @@ public class ExactlyOnceWorkerSourceTaskTest extends
ThreadedTest {
@Mock private Converter valueConverter;
@Mock private HeaderConverter headerConverter;
@Mock private TransformationChain<SourceRecord> transformationChain;
- @Mock private KafkaProducer<byte[], byte[]> producer;
+ @Mock private Producer<byte[], byte[]> producer;
@Mock private TopicAdmin admin;
@Mock private CloseableOffsetStorageReader offsetReader;
@Mock private OffsetStorageWriter offsetWriter;
@@ -885,6 +885,7 @@ public class ExactlyOnceWorkerSourceTaskTest extends
ThreadedTest {
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, "topic", 1,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, "topic", 2,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, "topic", 3,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ List<SourceRecord> records = Arrays.asList(record1, record2, record3);
expectTopicCreation(TOPIC);
@@ -898,10 +899,13 @@ public class ExactlyOnceWorkerSourceTaskTest extends
ThreadedTest {
expectSendRecordOnce(true);
expectSendRecordOnce(false);
+ // We commit the transaction after the batch has been successfully
dispatched to the producer
+ expectFlush(false, new AtomicInteger(), records.size());
+
PowerMock.replayAll();
// Try to send 3, make first pass, second fail. Should save last two
- workerTask.toSend = Arrays.asList(record1, record2, record3);
+ workerTask.toSend = records;
workerTask.sendRecords();
assertEquals(Arrays.asList(record2, record3), workerTask.toSend);
@@ -1187,20 +1191,15 @@ public class ExactlyOnceWorkerSourceTaskTest extends
ThreadedTest {
return false;
}
- private enum FlushOutcome {
- SUCCEED,
- SUCCEED_ANY_TIMES,
- FAIL_FLUSH_CALLBACK,
- FAIL_TRANSACTION_COMMIT
+ private void expectFlush(boolean anyTimes, AtomicInteger flushCount) {
+ expectFlush(anyTimes, flushCount, RECORDS.size());
}
- private CountDownLatch expectFlush(boolean anyTimes, AtomicInteger
flushCount) {
- CountDownLatch result = new CountDownLatch(1);
+ private void expectFlush(boolean anyTimes, AtomicInteger flushCount, int
batchSize) {
org.easymock.IExpectationSetters<Boolean> flushBegin = EasyMock
.expect(offsetWriter.beginFlush())
.andAnswer(() -> {
flushCount.incrementAndGet();
- result.countDown();
return true;
});
if (anyTimes) {
@@ -1220,10 +1219,9 @@ public class ExactlyOnceWorkerSourceTaskTest extends
ThreadedTest {
// The worker task doesn't actually use the returned future
offsetFlush.andReturn(null);
expectCall(producer::commitTransaction);
- expectCall(() -> sourceTask.commitRecord(EasyMock.anyObject(),
EasyMock.anyObject()));
+ expectCall(() -> sourceTask.commitRecord(EasyMock.anyObject(),
EasyMock.anyObject())).times(batchSize);
expectCall(sourceTask::commit);
}
- return result;
}
private void assertTransactionMetrics(int minimumMaxSizeExpected) {