This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new 9b4397abeeb KAFKA-14799: Ignore source task requests to abort empty 
transactions (#13379)
9b4397abeeb is described below

commit 9b4397abeeb69c40f13b01cf999d3df1837ca5d2
Author: Chris Egerton <[email protected]>
AuthorDate: Tue Mar 14 15:10:29 2023 -0400

    KAFKA-14799: Ignore source task requests to abort empty transactions 
(#13379)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/connect/source/TransactionContext.java   |  10 ++
 .../connect/runtime/AbstractWorkerSourceTask.java  |   5 +-
 .../runtime/ExactlyOnceWorkerSourceTask.java       |  10 +-
 .../runtime/ExactlyOnceWorkerSourceTaskTest.java   | 144 ++++++++++++++++++++-
 4 files changed, 160 insertions(+), 9 deletions(-)

diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
index f90d75baf47..4c3c00528ac 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
@@ -31,6 +31,11 @@ public interface TransactionContext {
     /**
      * Request a transaction commit after a source record is processed. The 
source record will be the
      * last record in the committed transaction.
+     * <p>
+     * If a task requests that the last record in a batch that it returns from 
{@link SourceTask#poll()}
+     * be committed by invoking this method, and also requests that that same 
batch be aborted by
+     * invoking {@link #abortTransaction()}, the record-based operation (in 
this case, committing
+     * the transaction) will take precedence.
      * @param record the record to commit the transaction after; may not be 
null.
      */
     void commitTransaction(SourceRecord record);
@@ -50,6 +55,11 @@ public interface TransactionContext {
      * and will not appear in a committed transaction. However, offsets for 
that transaction will still
      * be committed so that the records in that transaction are not 
reprocessed. If the data should be
      * reprocessed, the task should not invoke this method and should instead 
throw an exception.
+     * <p>
+     * If a task requests that the last record in a batch that it returns from 
{@link SourceTask#poll()}
+     * be aborted by invoking this method, and also requests that that same 
batch be committed by
+     * invoking {@link #commitTransaction()}, the record-based operation (in 
this case, aborting
+     * the transaction) will take precedence.
      * @param record the record to abort the transaction after; may not be 
null.
      */
     void abortTransaction(SourceRecord record);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index 2021808286c..eff4d849326 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -358,9 +358,7 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask {
                     continue;
                 }
                 log.trace("{} About to send {} records to Kafka", this, 
toSend.size());
-                if (sendRecords()) {
-                    batchDispatched();
-                } else {
+                if (!sendRecords()) {
                     stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, 
TimeUnit.MILLISECONDS);
                 }
             }
@@ -455,6 +453,7 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask {
             recordDispatched(preTransformRecord);
         }
         toSend = null;
+        batchDispatched();
         return true;
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index 21f6bd4f59d..e0fc987a3da 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -470,7 +470,7 @@ class ExactlyOnceWorkerSourceTask extends 
AbstractWorkerSourceTask {
                     protected boolean shouldCommitTransactionForBatch(long 
currentTimeMs) {
                         if (transactionContext.shouldAbortBatch()) {
                             log.info("Aborting transaction for batch as 
requested by connector");
-                            abortTransaction();
+                            maybeAbortTransaction();
                             // We abort the transaction, which causes all the 
records up to this point to be dropped, but we still want to
                             // commit offsets so that the task doesn't see the 
same records all over again
                             return true;
@@ -483,7 +483,7 @@ class ExactlyOnceWorkerSourceTask extends 
AbstractWorkerSourceTask {
                         if (transactionContext.shouldAbortOn(record)) {
                             log.info("Aborting transaction for record on topic 
{} as requested by connector", record.topic());
                             log.trace("Last record in aborted transaction: 
{}", record);
-                            abortTransaction();
+                            maybeAbortTransaction();
                             // We abort the transaction, which causes all the 
records up to this point to be dropped, but we still want to
                             // commit offsets so that the task doesn't see the 
same records all over again
                             return true;
@@ -491,7 +491,11 @@ class ExactlyOnceWorkerSourceTask extends 
AbstractWorkerSourceTask {
                         return transactionContext.shouldCommitOn(record);
                     }
 
-                    private void abortTransaction() {
+                    private void maybeAbortTransaction() {
+                        if (!transactionOpen) {
+                            log.warn("Ignoring request by task to abort 
transaction as the current transaction is empty");
+                            return;
+                        }
                         producer.abortTransaction();
                         transactionMetrics.abortTransaction();
                         transactionOpen = false;
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index 617a644be1f..7882b862c84 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.connect.runtime;
 
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
@@ -104,6 +104,7 @@ import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.never;
@@ -143,7 +144,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
     @Mock private Converter valueConverter;
     @Mock private HeaderConverter headerConverter;
     @Mock private TransformationChain<SourceRecord> transformationChain;
-    @Mock private KafkaProducer<byte[], byte[]> producer;
+    @Mock private Producer<byte[], byte[]> producer;
     @Mock private TopicAdmin admin;
     @Mock private CloseableOffsetStorageReader offsetReader;
     @Mock private OffsetStorageWriter offsetWriter;
@@ -196,9 +197,13 @@ public class ExactlyOnceWorkerSourceTaskTest {
         time = Time.SYSTEM;
         when(offsetStore.primaryOffsetsTopic()).thenReturn("offsets-topic");
         when(sourceTask.poll()).thenAnswer(invocation -> {
+            // Capture the records we'll return before decrementing the poll 
latch,
+            // since test cases may mutate the poll records field immediately
+            // after awaiting the latch
+            List<SourceRecord> result = pollRecords.get();
             pollLatch.get().countDown();
             Thread.sleep(10);
-            return pollRecords.get();
+            return result;
         });
     }
 
@@ -518,6 +523,9 @@ public class ExactlyOnceWorkerSourceTaskTest {
         // Test that the task handles an empty list of records
         createWorkerTask();
 
+        // Make sure the task returns empty batches from poll before we start 
polling it
+        pollRecords.set(Collections.emptyList());
+
         when(offsetWriter.beginFlush()).thenReturn(false);
 
         startTaskThread();
@@ -669,6 +677,132 @@ public class ExactlyOnceWorkerSourceTaskTest {
         assertTransactionMetrics(abort ? 0 : (3 * RECORDS.size()));
     }
 
+    @Test
+    public void testConnectorAbortsEmptyTransaction() throws Exception {
+        Map<String, String> connectorProps = 
sourceConnectorProps(SourceTask.TransactionBoundary.CONNECTOR);
+        sourceConfig = new SourceConnectorConfig(plugins, connectorProps, 
enableTopicCreation);
+        createWorkerTask();
+
+        expectPossibleTopicCreation();
+        expectTaskGetTopic();
+        expectApplyTransformationChain();
+        expectConvertHeadersAndKeyValue();
+        TransactionContext transactionContext = 
workerTask.sourceTaskContext.transactionContext();
+
+        startTaskThread();
+
+        // Ensure the task produces at least one non-empty batch
+        awaitPolls(1);
+        // Start returning empty batches from SourceTask::poll
+        awaitEmptyPolls(1);
+        // The task commits the active transaction, which should not be empty 
and should
+        // result in a single call to Producer::commitTransaction
+        transactionContext.commitTransaction();
+
+        // Ensure the task produces at least one empty batch after the 
transaction has been committed
+        awaitEmptyPolls(1);
+        // The task aborts the active transaction, which has no records in it 
and should not trigger a call to
+        // Producer::abortTransaction
+        transactionContext.abortTransaction();
+        // Make sure we go through at least one more poll, to give the 
framework a chance to handle the
+        // transaction abort request
+        awaitEmptyPolls(1);
+
+        awaitShutdown(true);
+
+        verify(producer, times(1)).beginTransaction();
+        verify(producer, times(1)).commitTransaction();
+        verify(producer, atLeast(RECORDS.size())).send(any(), any());
+        // We never abort transactions in this test!
+        verify(producer, never()).abortTransaction();
+
+        verifyPreflight();
+        verifyStartup();
+        verifyCleanShutdown();
+        verifyPossibleTopicCreation();
+    }
+
+    @Test
+    public void 
testMixedConnectorTransactionBoundaryCommitLastRecordAbortBatch() throws 
Exception {
+        // We fail tasks that try to abort and commit a transaction for the 
same record or same batch
+        // But we don't fail if they try to commit the last record of a batch 
and abort the entire batch
+        // Instead, we give precedence to the record-based operation
+
+        Map<String, String> connectorProps = 
sourceConnectorProps(SourceTask.TransactionBoundary.CONNECTOR);
+        sourceConfig = new SourceConnectorConfig(plugins, connectorProps, 
enableTopicCreation);
+        createWorkerTask();
+
+        expectPossibleTopicCreation();
+        expectTaskGetTopic();
+        expectApplyTransformationChain();
+        expectConvertHeadersAndKeyValue();
+        when(offsetWriter.beginFlush())
+                .thenReturn(true)
+                .thenReturn(false);
+
+        workerTask.initialize(TASK_CONFIG);
+
+        TransactionContext transactionContext = 
workerTask.sourceTaskContext.transactionContext();
+
+        // Request that the batch be aborted
+        transactionContext.abortTransaction();
+        // Request that the last record in the batch be committed
+        transactionContext.commitTransaction(RECORDS.get(RECORDS.size() - 1));
+
+        workerTask.toSend = RECORDS;
+        assertTrue(workerTask.sendRecords());
+
+        verifyTransactions(2, 1);
+        verifySends(RECORDS.size());
+        // We never abort transactions in this test!
+        verify(producer, never()).abortTransaction();
+
+        verifyPossibleTopicCreation();
+    }
+
+    @Test
+    public void 
testMixedConnectorTransactionBoundaryAbortLastRecordCommitBatch() throws 
Exception {
+        // We fail tasks that try to abort and commit a transaction for the 
same record or same batch
+        // But we don't fail if they try to abort the last record of a batch 
and commit the entire batch
+        // Instead, we give precedence to the record-based operation
+
+        Map<String, String> connectorProps = 
sourceConnectorProps(SourceTask.TransactionBoundary.CONNECTOR);
+        sourceConfig = new SourceConnectorConfig(plugins, connectorProps, 
enableTopicCreation);
+        createWorkerTask();
+
+        expectPossibleTopicCreation();
+        expectTaskGetTopic();
+        expectApplyTransformationChain();
+        expectConvertHeadersAndKeyValue();
+        when(offsetWriter.beginFlush())
+                .thenReturn(true)
+                .thenReturn(false);
+
+        workerTask.initialize(TASK_CONFIG);
+
+        TransactionContext transactionContext = 
workerTask.sourceTaskContext.transactionContext();
+
+        // Request that the last record in the batch be aborted
+        transactionContext.abortTransaction(RECORDS.get(RECORDS.size() - 1));
+        // Request that the batch be committed
+        transactionContext.commitTransaction();
+
+        workerTask.toSend = RECORDS;
+        assertTrue(workerTask.sendRecords());
+
+        verify(offsetWriter, times(2)).beginFlush();
+        verify(sourceTask, times(2)).commit();
+        // We open a transaction once for the aborted batch, and once to 
commit offsets for that batch
+        verify(producer, times(2)).beginTransaction();
+        verify(producer, times(1)).commitTransaction();
+        verify(offsetWriter, times(1)).doFlush(any());
+        verify(producer, times(1)).abortTransaction();
+
+        verifySends(RECORDS.size());
+
+        verifyPossibleTopicCreation();
+    }
+
     @Test
     public void testCommitFlushSyncCallbackFailure() throws Exception {
         Exception failure = new RecordTooLargeException();
@@ -768,12 +902,16 @@ public class ExactlyOnceWorkerSourceTaskTest {
         assertFalse(workerTask.sendRecords());
         assertEquals(Arrays.asList(record2, record3), workerTask.toSend);
         verify(producer).beginTransaction();
+        // When using poll-based transaction boundaries, we do not commit 
transactions while retrying delivery for a batch
+        verify(producer, never()).commitTransaction();
         verifySends(2);
         verifyPossibleTopicCreation();
 
         // Next they all succeed
         assertTrue(workerTask.sendRecords());
         assertNull(workerTask.toSend);
+        // After the entire batch is successfully dispatched to the producer, 
we can finally commit the transaction
+        verify(producer, times(1)).commitTransaction();
         verifySends(4);
 
         verify(offsetWriter).offset(PARTITION, offset(1));

Reply via email to