This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new ed1574766cd KAFKA-14727: Enable periodic offset commits for EOS source 
tasks (#13262)
ed1574766cd is described below

commit ed1574766cdf5658453b22189abea4b454a1ecbb
Author: Greg Harris <[email protected]>
AuthorDate: Thu Feb 16 15:51:34 2023 -0800

    KAFKA-14727: Enable periodic offset commits for EOS source tasks (#13262)
    
    Reviewers: Chris Egerton <[email protected]>
---
 .../connect/runtime/AbstractWorkerSourceTask.java  |  4 ++-
 .../runtime/ExactlyOnceWorkerSourceTask.java       | 20 +++++++++----
 .../kafka/connect/storage/OffsetStorageWriter.java |  7 -----
 .../runtime/ExactlyOnceWorkerSourceTaskTest.java   | 34 ++++++++++++----------
 4 files changed, 37 insertions(+), 28 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index 0d02ba25204..d3ec60ac66d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -349,8 +349,10 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask {
                         recordPollReturned(toSend.size(), time.milliseconds() 
- start);
                     }
                 }
-                if (toSend == null)
+                if (toSend == null) {
+                    batchDispatched();
                     continue;
+                }
                 log.trace("{} About to send {} records to Kafka", this, 
toSend.size());
                 if (sendRecords()) {
                     batchDispatched();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index 98318e8d376..972249d3f86 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -253,10 +253,6 @@ class ExactlyOnceWorkerSourceTask extends 
AbstractWorkerSourceTask {
 
         long started = time.milliseconds();
 
-        // We might have just aborted a transaction, in which case we'll have 
to begin a new one
-        // in order to commit offsets
-        maybeBeginTransaction();
-
         AtomicReference<Throwable> flushError = new AtomicReference<>();
         boolean shouldFlush = false;
         try {
@@ -267,6 +263,20 @@ class ExactlyOnceWorkerSourceTask extends 
AbstractWorkerSourceTask {
         } catch (Throwable e) {
             flushError.compareAndSet(null, e);
         }
+        if (flushError.get() == null && !transactionOpen && !shouldFlush) {
+            // There is no contents on the framework side to commit, so skip 
the offset flush and producer commit
+            long durationMillis = time.milliseconds() - started;
+            recordCommitSuccess(durationMillis);
+            log.debug("{} Finished commitOffsets successfully in {} ms", this, 
durationMillis);
+
+            commitSourceTask();
+            return;
+        }
+
+        // We might have just aborted a transaction, in which case we'll have 
to begin a new one
+        // in order to commit offsets
+        maybeBeginTransaction();
+
         if (shouldFlush) {
             // Now we can actually write the offsets to the internal topic.
             // No need to track the flush future here since it's guaranteed to 
complete by the time
@@ -391,7 +401,7 @@ class ExactlyOnceWorkerSourceTask extends 
AbstractWorkerSourceTask {
         }
 
         private void maybeCommitTransaction(boolean shouldCommit) {
-            if (shouldCommit && (transactionOpen || offsetWriter.willFlush())) 
{
+            if (shouldCommit) {
                 try (LoggingContext loggingContext = 
LoggingContext.forOffsets(id)) {
                     commitTransaction();
                 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
index 692669e7544..cb944034db1 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
@@ -150,13 +150,6 @@ public class OffsetStorageWriter {
         }
     }
 
-    /**
-     * @return whether there's anything to flush right now.
-     */
-    public synchronized boolean willFlush() {
-        return !data.isEmpty();
-    }
-
     /**
      * Flush the current offsets and clear them from this writer. This is 
non-blocking: it
      * moves the current set of offsets out of the way, serializes the data, 
and asynchronously
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index 981ad8c239d..23feadc25e5 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -257,7 +257,8 @@ public class ExactlyOnceWorkerSourceTaskTest extends 
ThreadedTest {
         });
 
         // The task checks to see if there are offsets to commit before pausing
-        EasyMock.expect(offsetWriter.willFlush()).andReturn(false);
+        EasyMock.expect(offsetWriter.beginFlush()).andReturn(false);
+        expectCall(sourceTask::commit);
 
         expectClose();
 
@@ -288,7 +289,7 @@ public class ExactlyOnceWorkerSourceTaskTest extends 
ThreadedTest {
         AtomicInteger flushes = new AtomicInteger(0);
         pollLatch = new CountDownLatch(10);
         expectPolls(polls);
-        expectAnyFlushes(flushes);
+        expectFlush(true, flushes);
 
         expectTopicCreation(TOPIC);
 
@@ -430,7 +431,7 @@ public class ExactlyOnceWorkerSourceTaskTest extends 
ThreadedTest {
         AtomicInteger flushes = new AtomicInteger(0);
         pollLatch = new CountDownLatch(10);
         expectPolls(polls);
-        expectAnyFlushes(flushes);
+        expectFlush(true, flushes);
 
         expectTopicCreation(TOPIC);
 
@@ -576,8 +577,13 @@ public class ExactlyOnceWorkerSourceTaskTest extends 
ThreadedTest {
         expectStartup();
 
         final CountDownLatch pollLatch = expectEmptyPolls(1, new 
AtomicInteger());
-        EasyMock.expect(offsetWriter.willFlush()).andReturn(false).anyTimes();
+        EasyMock.expect(offsetWriter.beginFlush()).andReturn(false).anyTimes();
 
+        final AtomicInteger taskCommits = new AtomicInteger(0);
+        expectCall(sourceTask::commit).andAnswer(() -> {
+            taskCommits.incrementAndGet();
+            return null;
+        }).anyTimes();
         expectCall(sourceTask::stop);
         expectCall(() -> statusListener.onShutdown(taskId));
 
@@ -595,6 +601,11 @@ public class ExactlyOnceWorkerSourceTaskTest extends 
ThreadedTest {
         taskFuture.get();
         assertPollMetrics(0);
 
+        assertTrue(
+                "SourceTask::commit should have been invoked at least twice: 
once after polling an empty batch, and once during shutdown",
+                taskCommits.get() >= 2
+        );
+
         PowerMock.verifyAll();
     }
 
@@ -611,7 +622,7 @@ public class ExactlyOnceWorkerSourceTaskTest extends 
ThreadedTest {
         AtomicInteger polls = new AtomicInteger();
         AtomicInteger flushes = new AtomicInteger();
         expectPolls(polls);
-        expectAnyFlushes(flushes);
+        expectFlush(true, flushes);
 
         expectTopicCreation(TOPIC);
 
@@ -724,11 +735,9 @@ public class ExactlyOnceWorkerSourceTaskTest extends 
ThreadedTest {
 
         // Third flush: triggered by TransactionContext::abortTransaction 
(batch)
         expectCall(producer::abortTransaction);
-        EasyMock.expect(offsetWriter.willFlush()).andReturn(true);
         expectFlush(false, flushes);
 
         // Third flush: triggered by TransactionContext::abortTransaction 
(record)
-        EasyMock.expect(offsetWriter.willFlush()).andReturn(true);
         expectCall(producer::abortTransaction);
         expectFlush(false, flushes);
 
@@ -834,8 +843,7 @@ public class ExactlyOnceWorkerSourceTaskTest extends 
ThreadedTest {
 
         expectPolls();
 
-        EasyMock.expect(offsetWriter.willFlush()).andReturn(true).anyTimes();
-        EasyMock.expect(offsetWriter.beginFlush()).andReturn(true);
+        EasyMock.expect(offsetWriter.beginFlush()).andReturn(true).anyTimes();
         expectCall(offsetWriter::cancelFlush);
 
         expectTopicCreation(TOPIC);
@@ -950,8 +958,9 @@ public class ExactlyOnceWorkerSourceTaskTest extends 
ThreadedTest {
 
         expectCall(() -> statusListener.onStartup(taskId));
 
+        expectCall(sourceTask::commit);
         expectCall(sourceTask::stop);
-        EasyMock.expect(offsetWriter.willFlush()).andReturn(false);
+        EasyMock.expect(offsetWriter.beginFlush()).andReturn(false);
 
         expectCall(() -> statusListener.onShutdown(taskId));
 
@@ -1217,11 +1226,6 @@ public class ExactlyOnceWorkerSourceTaskTest extends 
ThreadedTest {
         return result;
     }
 
-    private CountDownLatch expectAnyFlushes(AtomicInteger flushCount) {
-        EasyMock.expect(offsetWriter.willFlush()).andReturn(true).anyTimes();
-        return expectFlush(true, flushCount);
-    }
-
     private void assertTransactionMetrics(int minimumMaxSizeExpected) {
         MetricGroup transactionGroup = 
workerTask.transactionMetricsGroup().metricGroup();
         double actualMin = 
metrics.currentMetricValueAsDouble(transactionGroup, "transaction-size-min");

Reply via email to