This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push:
new 9b4397abeeb KAFKA-14799: Ignore source task requests to abort empty
transactions (#13379)
9b4397abeeb is described below
commit 9b4397abeeb69c40f13b01cf999d3df1837ca5d2
Author: Chris Egerton <[email protected]>
AuthorDate: Tue Mar 14 15:10:29 2023 -0400
KAFKA-14799: Ignore source task requests to abort empty transactions
(#13379)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/connect/source/TransactionContext.java | 10 ++
.../connect/runtime/AbstractWorkerSourceTask.java | 5 +-
.../runtime/ExactlyOnceWorkerSourceTask.java | 10 +-
.../runtime/ExactlyOnceWorkerSourceTaskTest.java | 144 ++++++++++++++++++++-
4 files changed, 160 insertions(+), 9 deletions(-)
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
b/connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
index f90d75baf47..4c3c00528ac 100644
---
a/connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
+++
b/connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
@@ -31,6 +31,11 @@ public interface TransactionContext {
/**
* Request a transaction commit after a source record is processed. The
source record will be the
* last record in the committed transaction.
+ * <p>
+ * If a task requests that the last record in a batch that it returns from
{@link SourceTask#poll()}
+ * be committed by invoking this method, and also requests that that same
batch be aborted by
+ * invoking {@link #abortTransaction()}, the record-based operation (in
this case, committing
+ * the transaction) will take precedence.
* @param record the record to commit the transaction after; may not be
null.
*/
void commitTransaction(SourceRecord record);
@@ -50,6 +55,11 @@ public interface TransactionContext {
* and will not appear in a committed transaction. However, offsets for
that transaction will still
* be committed so that the records in that transaction are not
reprocessed. If the data should be
* reprocessed, the task should not invoke this method and should instead
throw an exception.
+ * <p>
+ * If a task requests that the last record in a batch that it returns from
{@link SourceTask#poll()}
+ * be aborted by invoking this method, and also requests that that same
batch be committed by
+ * invoking {@link #commitTransaction()}, the record-based operation (in
this case, aborting
+ * the transaction) will take precedence.
* @param record the record to abort the transaction after; may not be
null.
*/
void abortTransaction(SourceRecord record);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index 2021808286c..eff4d849326 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -358,9 +358,7 @@ public abstract class AbstractWorkerSourceTask extends
WorkerTask {
continue;
}
log.trace("{} About to send {} records to Kafka", this,
toSend.size());
- if (sendRecords()) {
- batchDispatched();
- } else {
+ if (!sendRecords()) {
stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS,
TimeUnit.MILLISECONDS);
}
}
@@ -455,6 +453,7 @@ public abstract class AbstractWorkerSourceTask extends
WorkerTask {
recordDispatched(preTransformRecord);
}
toSend = null;
+ batchDispatched();
return true;
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index 21f6bd4f59d..e0fc987a3da 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -470,7 +470,7 @@ class ExactlyOnceWorkerSourceTask extends
AbstractWorkerSourceTask {
protected boolean shouldCommitTransactionForBatch(long
currentTimeMs) {
if (transactionContext.shouldAbortBatch()) {
log.info("Aborting transaction for batch as
requested by connector");
- abortTransaction();
+ maybeAbortTransaction();
// We abort the transaction, which causes all the
records up to this point to be dropped, but we still want to
// commit offsets so that the task doesn't see the
same records all over again
return true;
@@ -483,7 +483,7 @@ class ExactlyOnceWorkerSourceTask extends
AbstractWorkerSourceTask {
if (transactionContext.shouldAbortOn(record)) {
log.info("Aborting transaction for record on topic
{} as requested by connector", record.topic());
log.trace("Last record in aborted transaction:
{}", record);
- abortTransaction();
+ maybeAbortTransaction();
// We abort the transaction, which causes all the
records up to this point to be dropped, but we still want to
// commit offsets so that the task doesn't see the
same records all over again
return true;
@@ -491,7 +491,11 @@ class ExactlyOnceWorkerSourceTask extends
AbstractWorkerSourceTask {
return transactionContext.shouldCommitOn(record);
}
- private void abortTransaction() {
+ private void maybeAbortTransaction() {
+ if (!transactionOpen) {
+ log.warn("Ignoring request by task to abort
transaction as the current transaction is empty");
+ return;
+ }
producer.abortTransaction();
transactionMetrics.abortTransaction();
transactionOpen = false;
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index 617a644be1f..7882b862c84 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.connect.runtime;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
@@ -104,6 +104,7 @@ import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
@@ -143,7 +144,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
@Mock private Converter valueConverter;
@Mock private HeaderConverter headerConverter;
@Mock private TransformationChain<SourceRecord> transformationChain;
- @Mock private KafkaProducer<byte[], byte[]> producer;
+ @Mock private Producer<byte[], byte[]> producer;
@Mock private TopicAdmin admin;
@Mock private CloseableOffsetStorageReader offsetReader;
@Mock private OffsetStorageWriter offsetWriter;
@@ -196,9 +197,13 @@ public class ExactlyOnceWorkerSourceTaskTest {
time = Time.SYSTEM;
when(offsetStore.primaryOffsetsTopic()).thenReturn("offsets-topic");
when(sourceTask.poll()).thenAnswer(invocation -> {
+ // Capture the records we'll return before decrementing the poll
latch,
+ // since test cases may mutate the poll records field immediately
+ // after awaiting the latch
+ List<SourceRecord> result = pollRecords.get();
pollLatch.get().countDown();
Thread.sleep(10);
- return pollRecords.get();
+ return result;
});
}
@@ -518,6 +523,9 @@ public class ExactlyOnceWorkerSourceTaskTest {
// Test that the task handles an empty list of records
createWorkerTask();
+ // Make sure the task returns empty batches from poll before we start
polling it
+ pollRecords.set(Collections.emptyList());
+
when(offsetWriter.beginFlush()).thenReturn(false);
startTaskThread();
@@ -669,6 +677,132 @@ public class ExactlyOnceWorkerSourceTaskTest {
assertTransactionMetrics(abort ? 0 : (3 * RECORDS.size()));
}
+ @Test
+ public void testConnectorAbortsEmptyTransaction() throws Exception {
+ Map<String, String> connectorProps =
sourceConnectorProps(SourceTask.TransactionBoundary.CONNECTOR);
+ sourceConfig = new SourceConnectorConfig(plugins, connectorProps,
enableTopicCreation);
+ createWorkerTask();
+
+ expectPossibleTopicCreation();
+ expectTaskGetTopic();
+ expectApplyTransformationChain();
+ expectConvertHeadersAndKeyValue();
+ TransactionContext transactionContext =
workerTask.sourceTaskContext.transactionContext();
+
+ startTaskThread();
+
+ // Ensure the task produces at least one non-empty batch
+ awaitPolls(1);
+ // Start returning empty batches from SourceTask::poll
+ awaitEmptyPolls(1);
+ // The task commits the active transaction, which should not be empty
and should
+ // result in a single call to Producer::commitTransaction
+ transactionContext.commitTransaction();
+
+ // Ensure the task produces at least one empty batch after the
transaction has been committed
+ awaitEmptyPolls(1);
+ // The task aborts the active transaction, which has no records in it
and should not trigger a call to
+ // Producer::abortTransaction
+ transactionContext.abortTransaction();
+ // Make sure we go through at least one more poll, to give the
framework a chance to handle the
+ // transaction abort request
+ awaitEmptyPolls(1);
+
+ awaitShutdown(true);
+
+ verify(producer, times(1)).beginTransaction();
+ verify(producer, times(1)).commitTransaction();
+ verify(producer, atLeast(RECORDS.size())).send(any(), any());
+ // We never abort transactions in this test!
+ verify(producer, never()).abortTransaction();
+
+ verifyPreflight();
+ verifyStartup();
+ verifyCleanShutdown();
+ verifyPossibleTopicCreation();
+ }
+
+ @Test
+ public void
testMixedConnectorTransactionBoundaryCommitLastRecordAbortBatch() throws
Exception {
+ // We fail tasks that try to abort and commit a transaction for the
same record or same batch
+ // But we don't fail if they try to commit the last record of a batch
and abort the entire batch
+ // Instead, we give precedence to the record-based operation
+
+ Map<String, String> connectorProps =
sourceConnectorProps(SourceTask.TransactionBoundary.CONNECTOR);
+ sourceConfig = new SourceConnectorConfig(plugins, connectorProps,
enableTopicCreation);
+ createWorkerTask();
+
+ expectPossibleTopicCreation();
+ expectTaskGetTopic();
+ expectApplyTransformationChain();
+ expectConvertHeadersAndKeyValue();
+ when(offsetWriter.beginFlush())
+ .thenReturn(true)
+ .thenReturn(false);
+
+ workerTask.initialize(TASK_CONFIG);
+
+ TransactionContext transactionContext =
workerTask.sourceTaskContext.transactionContext();
+
+ // Request that the batch be aborted
+ transactionContext.abortTransaction();
+ // Request that the last record in the batch be committed
+ transactionContext.commitTransaction(RECORDS.get(RECORDS.size() - 1));
+
+ workerTask.toSend = RECORDS;
+ assertTrue(workerTask.sendRecords());
+
+ verifyTransactions(2, 1);
+ verifySends(RECORDS.size());
+ // We never abort transactions in this test!
+ verify(producer, never()).abortTransaction();
+
+ verifyPossibleTopicCreation();
+ }
+
+ @Test
+ public void
testMixedConnectorTransactionBoundaryAbortLastRecordCommitBatch() throws
Exception {
+ // We fail tasks that try to abort and commit a transaction for the
same record or same batch
+ // But we don't fail if they try to abort the last record of a batch
and commit the entire batch
+ // Instead, we give precedence to the record-based operation
+
+ Map<String, String> connectorProps =
sourceConnectorProps(SourceTask.TransactionBoundary.CONNECTOR);
+ sourceConfig = new SourceConnectorConfig(plugins, connectorProps,
enableTopicCreation);
+ createWorkerTask();
+
+ expectPossibleTopicCreation();
+ expectTaskGetTopic();
+ expectApplyTransformationChain();
+ expectConvertHeadersAndKeyValue();
+ when(offsetWriter.beginFlush())
+ .thenReturn(true)
+ .thenReturn(false);
+
+ workerTask.initialize(TASK_CONFIG);
+
+ TransactionContext transactionContext =
workerTask.sourceTaskContext.transactionContext();
+
+ // Request that the last record in the batch be aborted
+ transactionContext.abortTransaction(RECORDS.get(RECORDS.size() - 1));
+ // Request that the batch be committed
+ transactionContext.commitTransaction();
+
+ workerTask.toSend = RECORDS;
+ assertTrue(workerTask.sendRecords());
+
+ verify(offsetWriter, times(2)).beginFlush();
+ verify(sourceTask, times(2)).commit();
+ // We open a transaction once for the aborted batch, and once to
commit offsets for that batch
+ verify(producer, times(2)).beginTransaction();
+ verify(producer, times(1)).commitTransaction();
+ verify(offsetWriter, times(1)).doFlush(any());
+ verify(producer, times(1)).abortTransaction();
+
+ verifySends(RECORDS.size());
+
+ verifyPossibleTopicCreation();
+ }
+
@Test
public void testCommitFlushSyncCallbackFailure() throws Exception {
Exception failure = new RecordTooLargeException();
@@ -768,12 +902,16 @@ public class ExactlyOnceWorkerSourceTaskTest {
assertFalse(workerTask.sendRecords());
assertEquals(Arrays.asList(record2, record3), workerTask.toSend);
verify(producer).beginTransaction();
+ // When using poll-based transaction boundaries, we do not commit
transactions while retrying delivery for a batch
+ verify(producer, never()).commitTransaction();
verifySends(2);
verifyPossibleTopicCreation();
// Next they all succeed
assertTrue(workerTask.sendRecords());
assertNull(workerTask.toSend);
+ // After the entire batch is successfully dispatched to the producer,
we can finally commit the transaction
+ verify(producer, times(1)).commitTransaction();
verifySends(4);
verify(offsetWriter).offset(PARTITION, offset(1));