This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new ed1574766cd KAFKA-14727: Enable periodic offset commits for EOS source
tasks (#13262)
ed1574766cd is described below
commit ed1574766cdf5658453b22189abea4b454a1ecbb
Author: Greg Harris <[email protected]>
AuthorDate: Thu Feb 16 15:51:34 2023 -0800
KAFKA-14727: Enable periodic offset commits for EOS source tasks (#13262)
Reviewers: Chris Egerton <[email protected]>
---
.../connect/runtime/AbstractWorkerSourceTask.java | 4 ++-
.../runtime/ExactlyOnceWorkerSourceTask.java | 20 +++++++++----
.../kafka/connect/storage/OffsetStorageWriter.java | 7 -----
.../runtime/ExactlyOnceWorkerSourceTaskTest.java | 34 ++++++++++++----------
4 files changed, 37 insertions(+), 28 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index 0d02ba25204..d3ec60ac66d 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -349,8 +349,10 @@ public abstract class AbstractWorkerSourceTask extends
WorkerTask {
recordPollReturned(toSend.size(), time.milliseconds()
- start);
}
}
- if (toSend == null)
+ if (toSend == null) {
+ batchDispatched();
continue;
+ }
log.trace("{} About to send {} records to Kafka", this,
toSend.size());
if (sendRecords()) {
batchDispatched();
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index 98318e8d376..972249d3f86 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -253,10 +253,6 @@ class ExactlyOnceWorkerSourceTask extends
AbstractWorkerSourceTask {
long started = time.milliseconds();
- // We might have just aborted a transaction, in which case we'll have
to begin a new one
- // in order to commit offsets
- maybeBeginTransaction();
-
AtomicReference<Throwable> flushError = new AtomicReference<>();
boolean shouldFlush = false;
try {
@@ -267,6 +263,20 @@ class ExactlyOnceWorkerSourceTask extends
AbstractWorkerSourceTask {
} catch (Throwable e) {
flushError.compareAndSet(null, e);
}
+ if (flushError.get() == null && !transactionOpen && !shouldFlush) {
+ // There is no contents on the framework side to commit, so skip
the offset flush and producer commit
+ long durationMillis = time.milliseconds() - started;
+ recordCommitSuccess(durationMillis);
+ log.debug("{} Finished commitOffsets successfully in {} ms", this,
durationMillis);
+
+ commitSourceTask();
+ return;
+ }
+
+ // We might have just aborted a transaction, in which case we'll have
to begin a new one
+ // in order to commit offsets
+ maybeBeginTransaction();
+
if (shouldFlush) {
// Now we can actually write the offsets to the internal topic.
// No need to track the flush future here since it's guaranteed to
complete by the time
@@ -391,7 +401,7 @@ class ExactlyOnceWorkerSourceTask extends
AbstractWorkerSourceTask {
}
private void maybeCommitTransaction(boolean shouldCommit) {
- if (shouldCommit && (transactionOpen || offsetWriter.willFlush()))
{
+ if (shouldCommit) {
try (LoggingContext loggingContext =
LoggingContext.forOffsets(id)) {
commitTransaction();
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
index 692669e7544..cb944034db1 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
@@ -150,13 +150,6 @@ public class OffsetStorageWriter {
}
}
- /**
- * @return whether there's anything to flush right now.
- */
- public synchronized boolean willFlush() {
- return !data.isEmpty();
- }
-
/**
* Flush the current offsets and clear them from this writer. This is
non-blocking: it
* moves the current set of offsets out of the way, serializes the data,
and asynchronously
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index 981ad8c239d..23feadc25e5 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -257,7 +257,8 @@ public class ExactlyOnceWorkerSourceTaskTest extends
ThreadedTest {
});
// The task checks to see if there are offsets to commit before pausing
- EasyMock.expect(offsetWriter.willFlush()).andReturn(false);
+ EasyMock.expect(offsetWriter.beginFlush()).andReturn(false);
+ expectCall(sourceTask::commit);
expectClose();
@@ -288,7 +289,7 @@ public class ExactlyOnceWorkerSourceTaskTest extends
ThreadedTest {
AtomicInteger flushes = new AtomicInteger(0);
pollLatch = new CountDownLatch(10);
expectPolls(polls);
- expectAnyFlushes(flushes);
+ expectFlush(true, flushes);
expectTopicCreation(TOPIC);
@@ -430,7 +431,7 @@ public class ExactlyOnceWorkerSourceTaskTest extends
ThreadedTest {
AtomicInteger flushes = new AtomicInteger(0);
pollLatch = new CountDownLatch(10);
expectPolls(polls);
- expectAnyFlushes(flushes);
+ expectFlush(true, flushes);
expectTopicCreation(TOPIC);
@@ -576,8 +577,13 @@ public class ExactlyOnceWorkerSourceTaskTest extends
ThreadedTest {
expectStartup();
final CountDownLatch pollLatch = expectEmptyPolls(1, new
AtomicInteger());
- EasyMock.expect(offsetWriter.willFlush()).andReturn(false).anyTimes();
+ EasyMock.expect(offsetWriter.beginFlush()).andReturn(false).anyTimes();
+ final AtomicInteger taskCommits = new AtomicInteger(0);
+ expectCall(sourceTask::commit).andAnswer(() -> {
+ taskCommits.incrementAndGet();
+ return null;
+ }).anyTimes();
expectCall(sourceTask::stop);
expectCall(() -> statusListener.onShutdown(taskId));
@@ -595,6 +601,11 @@ public class ExactlyOnceWorkerSourceTaskTest extends
ThreadedTest {
taskFuture.get();
assertPollMetrics(0);
+ assertTrue(
+ "SourceTask::commit should have been invoked at least twice:
once after polling an empty batch, and once during shutdown",
+ taskCommits.get() >= 2
+ );
+
PowerMock.verifyAll();
}
@@ -611,7 +622,7 @@ public class ExactlyOnceWorkerSourceTaskTest extends
ThreadedTest {
AtomicInteger polls = new AtomicInteger();
AtomicInteger flushes = new AtomicInteger();
expectPolls(polls);
- expectAnyFlushes(flushes);
+ expectFlush(true, flushes);
expectTopicCreation(TOPIC);
@@ -724,11 +735,9 @@ public class ExactlyOnceWorkerSourceTaskTest extends
ThreadedTest {
// Third flush: triggered by TransactionContext::abortTransaction
(batch)
expectCall(producer::abortTransaction);
- EasyMock.expect(offsetWriter.willFlush()).andReturn(true);
expectFlush(false, flushes);
// Third flush: triggered by TransactionContext::abortTransaction
(record)
- EasyMock.expect(offsetWriter.willFlush()).andReturn(true);
expectCall(producer::abortTransaction);
expectFlush(false, flushes);
@@ -834,8 +843,7 @@ public class ExactlyOnceWorkerSourceTaskTest extends
ThreadedTest {
expectPolls();
- EasyMock.expect(offsetWriter.willFlush()).andReturn(true).anyTimes();
- EasyMock.expect(offsetWriter.beginFlush()).andReturn(true);
+ EasyMock.expect(offsetWriter.beginFlush()).andReturn(true).anyTimes();
expectCall(offsetWriter::cancelFlush);
expectTopicCreation(TOPIC);
@@ -950,8 +958,9 @@ public class ExactlyOnceWorkerSourceTaskTest extends
ThreadedTest {
expectCall(() -> statusListener.onStartup(taskId));
+ expectCall(sourceTask::commit);
expectCall(sourceTask::stop);
- EasyMock.expect(offsetWriter.willFlush()).andReturn(false);
+ EasyMock.expect(offsetWriter.beginFlush()).andReturn(false);
expectCall(() -> statusListener.onShutdown(taskId));
@@ -1217,11 +1226,6 @@ public class ExactlyOnceWorkerSourceTaskTest extends
ThreadedTest {
return result;
}
- private CountDownLatch expectAnyFlushes(AtomicInteger flushCount) {
- EasyMock.expect(offsetWriter.willFlush()).andReturn(true).anyTimes();
- return expectFlush(true, flushCount);
- }
-
private void assertTransactionMetrics(int minimumMaxSizeExpected) {
MetricGroup transactionGroup =
workerTask.transactionMetricsGroup().metricGroup();
double actualMin =
metrics.currentMetricValueAsDouble(transactionGroup, "transaction-size-min");