This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new cf8c608028a KAFKA-14799: Ignore source task requests to abort empty 
transactions (#13379)
cf8c608028a is described below

commit cf8c608028aa415d7b4868cc2ce224765310eb86
Author: Chris Egerton <[email protected]>
AuthorDate: Tue Mar 14 15:10:29 2023 -0400

    KAFKA-14799: Ignore source task requests to abort empty transactions 
(#13379)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/connect/source/TransactionContext.java   | 10 +++++++++
 .../connect/runtime/AbstractWorkerSourceTask.java  |  5 ++---
 .../runtime/ExactlyOnceWorkerSourceTask.java       | 10 ++++++---
 .../runtime/ExactlyOnceWorkerSourceTaskTest.java   | 24 ++++++++++------------
 4 files changed, 30 insertions(+), 19 deletions(-)

diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
index f90d75baf47..4c3c00528ac 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
@@ -31,6 +31,11 @@ public interface TransactionContext {
     /**
      * Request a transaction commit after a source record is processed. The 
source record will be the
      * last record in the committed transaction.
+     * <p>
+     * If a task requests that the last record in a batch that it returns from 
{@link SourceTask#poll()}
+     * be committed by invoking this method, and also requests that that same 
batch be aborted by
+     * invoking {@link #abortTransaction()}, the record-based operation (in 
this case, committing
+     * the transaction) will take precedence.
      * @param record the record to commit the transaction after; may not be 
null.
      */
     void commitTransaction(SourceRecord record);
@@ -50,6 +55,11 @@ public interface TransactionContext {
      * and will not appear in a committed transaction. However, offsets for 
that transaction will still
      * be committed so that the records in that transaction are not 
reprocessed. If the data should be
      * reprocessed, the task should not invoke this method and should instead 
throw an exception.
+     * <p>
+     * If a task requests that the last record in a batch that it returns from 
{@link SourceTask#poll()}
+     * be aborted by invoking this method, and also requests that that same 
batch be committed by
+     * invoking {@link #commitTransaction()}, the record-based operation (in 
this case, aborting
+     * the transaction) will take precedence.
      * @param record the record to abort the transaction after; may not be 
null.
      */
     void abortTransaction(SourceRecord record);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index 51836a2d6a3..09d8d739eed 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -354,9 +354,7 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask {
                     continue;
                 }
                 log.trace("{} About to send {} records to Kafka", this, 
toSend.size());
-                if (sendRecords()) {
-                    batchDispatched();
-                } else {
+                if (!sendRecords()) {
                     stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, 
TimeUnit.MILLISECONDS);
                 }
             }
@@ -451,6 +449,7 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask {
             recordDispatched(preTransformRecord);
         }
         toSend = null;
+        batchDispatched();
         return true;
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index 972249d3f86..58efaadc47c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -468,7 +468,7 @@ class ExactlyOnceWorkerSourceTask extends 
AbstractWorkerSourceTask {
                     protected boolean shouldCommitTransactionForBatch(long 
currentTimeMs) {
                         if (transactionContext.shouldAbortBatch()) {
                             log.info("Aborting transaction for batch as 
requested by connector");
-                            abortTransaction();
+                            maybeAbortTransaction();
                             // We abort the transaction, which causes all the 
records up to this point to be dropped, but we still want to
                             // commit offsets so that the task doesn't see the 
same records all over again
                             return true;
@@ -481,7 +481,7 @@ class ExactlyOnceWorkerSourceTask extends 
AbstractWorkerSourceTask {
                         if (transactionContext.shouldAbortOn(record)) {
                             log.info("Aborting transaction for record on topic 
{} as requested by connector", record.topic());
                             log.trace("Last record in aborted transaction: 
{}", record);
-                            abortTransaction();
+                            maybeAbortTransaction();
                             // We abort the transaction, which causes all the 
records up to this point to be dropped, but we still want to
                             // commit offsets so that the task doesn't see the 
same records all over again
                             return true;
@@ -489,7 +489,11 @@ class ExactlyOnceWorkerSourceTask extends 
AbstractWorkerSourceTask {
                         return transactionContext.shouldCommitOn(record);
                     }
 
-                    private void abortTransaction() {
+                    private void maybeAbortTransaction() {
+                        if (!transactionOpen) {
+                            log.warn("Ignoring request by task to abort 
transaction as the current transaction is empty");
+                            return;
+                        }
                         producer.abortTransaction();
                         transactionMetrics.abortTransaction();
                         transactionOpen = false;
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index 23feadc25e5..35499a8c55c 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -17,8 +17,8 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
@@ -138,7 +138,7 @@ public class ExactlyOnceWorkerSourceTaskTest extends 
ThreadedTest {
     @Mock private Converter valueConverter;
     @Mock private HeaderConverter headerConverter;
     @Mock private TransformationChain<SourceRecord> transformationChain;
-    @Mock private KafkaProducer<byte[], byte[]> producer;
+    @Mock private Producer<byte[], byte[]> producer;
     @Mock private TopicAdmin admin;
     @Mock private CloseableOffsetStorageReader offsetReader;
     @Mock private OffsetStorageWriter offsetWriter;
@@ -885,6 +885,7 @@ public class ExactlyOnceWorkerSourceTaskTest extends 
ThreadedTest {
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, "topic", 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, "topic", 3, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        List<SourceRecord> records = Arrays.asList(record1, record2, record3);
 
         expectTopicCreation(TOPIC);
 
@@ -898,10 +899,13 @@ public class ExactlyOnceWorkerSourceTaskTest extends 
ThreadedTest {
         expectSendRecordOnce(true);
         expectSendRecordOnce(false);
 
+        // We commit the transaction after the batch has been successfully 
dispatched to the producer
+        expectFlush(false, new AtomicInteger(), records.size());
+
         PowerMock.replayAll();
 
         // Try to send 3, make first pass, second fail. Should save last two
-        workerTask.toSend = Arrays.asList(record1, record2, record3);
+        workerTask.toSend = records;
         workerTask.sendRecords();
         assertEquals(Arrays.asList(record2, record3), workerTask.toSend);
 
@@ -1187,20 +1191,15 @@ public class ExactlyOnceWorkerSourceTaskTest extends 
ThreadedTest {
         return false;
     }
 
-    private enum FlushOutcome {
-        SUCCEED,
-        SUCCEED_ANY_TIMES,
-        FAIL_FLUSH_CALLBACK,
-        FAIL_TRANSACTION_COMMIT
+    private void expectFlush(boolean anyTimes, AtomicInteger flushCount) {
+        expectFlush(anyTimes, flushCount, RECORDS.size());
     }
 
-    private CountDownLatch expectFlush(boolean anyTimes, AtomicInteger 
flushCount) {
-        CountDownLatch result = new CountDownLatch(1);
+    private void expectFlush(boolean anyTimes, AtomicInteger flushCount, int 
batchSize) {
         org.easymock.IExpectationSetters<Boolean> flushBegin = EasyMock
                 .expect(offsetWriter.beginFlush())
                 .andAnswer(() -> {
                     flushCount.incrementAndGet();
-                    result.countDown();
                     return true;
                 });
         if (anyTimes) {
@@ -1220,10 +1219,9 @@ public class ExactlyOnceWorkerSourceTaskTest extends 
ThreadedTest {
             // The worker task doesn't actually use the returned future
             offsetFlush.andReturn(null);
             expectCall(producer::commitTransaction);
-            expectCall(() -> sourceTask.commitRecord(EasyMock.anyObject(), 
EasyMock.anyObject()));
+            expectCall(() -> sourceTask.commitRecord(EasyMock.anyObject(), 
EasyMock.anyObject())).times(batchSize);
             expectCall(sourceTask::commit);
         }
-        return result;
     }
 
     private void assertTransactionMetrics(int minimumMaxSizeExpected) {

Reply via email to