This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push:
new de9e11b5ecf KAFKA-5756: Wait for concurrent source task offset flush
to complete before starting next flush (#13208)
de9e11b5ecf is described below
commit de9e11b5ecf661868ce48c77499dbe4139626f02
Author: Greg Harris <[email protected]>
AuthorDate: Wed Feb 15 18:29:20 2023 -0800
KAFKA-5756: Wait for concurrent source task offset flush to complete before
starting next flush (#13208)
Reviewers: Mickael Maison <[email protected]>, Chris Egerton
<[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../connect/runtime/AbstractWorkerSourceTask.java | 4 ++
.../runtime/ExactlyOnceWorkerSourceTask.java | 14 ++++--
.../kafka/connect/runtime/WorkerSinkTask.java | 4 ++
.../kafka/connect/runtime/WorkerSourceTask.java | 14 +++++-
.../kafka/connect/storage/OffsetStorageWriter.java | 53 ++++++++++++++++++----
.../runtime/ExactlyOnceWorkerSourceTaskTest.java | 5 +-
.../kafka/connect/runtime/WorkerSinkTaskTest.java | 47 +++++++++++++++++++
.../connect/runtime/WorkerSourceTaskTest.java | 5 +-
.../connect/storage/OffsetStorageWriterTest.java | 36 ++++++++-------
10 files changed, 147 insertions(+), 37 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index cea6a193790..2be076e435e 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -155,7 +155,7 @@
files="(KafkaConfigBackingStore|Values|ConnectMetricsRegistry).java"/>
<suppress checks="NPathComplexity"
-
files="(DistributedHerder|RestClient|RestServer|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask|TopicAdmin).java"/>
+
files="(DistributedHerder|RestClient|RestServer|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask|WorkerSourceTask|TopicAdmin).java"/>
<!-- connect tests-->
<suppress checks="ClassDataAbstractionCoupling"
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index 9f6104b26a0..ff15f631a73 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -365,6 +365,10 @@ public abstract class AbstractWorkerSourceTask extends
WorkerTask {
} catch (InterruptedException e) {
// Ignore and allow to exit.
} catch (RuntimeException e) {
+ if (isCancelled()) {
+ log.debug("Skipping final offset commit as task has been
cancelled");
+ throw e;
+ }
try {
finalOffsetCommit(true);
} catch (Exception offsetException) {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index fe580e66cd8..2642ae776ac 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -219,9 +219,6 @@ class ExactlyOnceWorkerSourceTask extends
AbstractWorkerSourceTask {
if (failed) {
log.debug("Skipping final offset commit as task has failed");
return;
- } else if (isCancelled()) {
- log.debug("Skipping final offset commit as task has been
cancelled");
- return;
}
// It should be safe to commit here even if we were in the middle of
retrying on RetriableExceptions in the
@@ -263,7 +260,16 @@ class ExactlyOnceWorkerSourceTask extends
AbstractWorkerSourceTask {
maybeBeginTransaction();
AtomicReference<Throwable> flushError = new AtomicReference<>();
- if (offsetWriter.beginFlush()) {
+ boolean shouldFlush = false;
+ try {
+ // Begin the flush without waiting, as there should not be any
concurrent flushes.
+ // This is because commitTransaction is always called on the same
thread, and should always block until
+ // the flush is complete, or cancel the flush if an error occurs.
+ shouldFlush = offsetWriter.beginFlush();
+ } catch (Throwable e) {
+ flushError.compareAndSet(null, e);
+ }
+ if (shouldFlush) {
// Now we can actually write the offsets to the internal topic.
// No need to track the flush future here since it's guaranteed to
complete by the time
// Producer::commitTransaction completes
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 76dae3e7f3d..f0afad85b6e 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -366,6 +366,10 @@ class WorkerSinkTask extends WorkerTask {
* the write commit.
**/
private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets,
boolean closing, int seqno) {
+ if (isCancelled()) {
+ log.debug("Skipping final offset commit as task has been
cancelled");
+ return;
+ }
if (closing) {
doCommitSync(offsets, seqno);
} else {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 4f7fd741e61..ba9600561a2 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -249,7 +249,19 @@ class WorkerSourceTask extends AbstractWorkerSourceTask {
// though we may update them here with newer offsets for acked records.
offsetsToCommit.offsets().forEach(offsetWriter::offset);
- if (!offsetWriter.beginFlush()) {
+ boolean shouldFlush;
+ try {
+ shouldFlush = offsetWriter.beginFlush(timeout -
time.milliseconds(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ log.warn("{} Interrupted while waiting for previous offset flush
to complete, cancelling", this);
+ recordCommitFailure(time.milliseconds() - started, e);
+ return false;
+ } catch (TimeoutException e) {
+ log.warn("{} Timed out while waiting for previous offset flush to
complete, cancelling", this);
+ recordCommitFailure(time.milliseconds() - started, e);
+ return false;
+ }
+ if (!shouldFlush) {
// There was nothing in the offsets to process, but we still mark
a successful offset commit.
long durationMillis = time.milliseconds() - started;
recordCommitSuccess(durationMillis);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
index b67e3d7b1b4..692669e7544 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
@@ -26,6 +26,9 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* <p>
@@ -73,6 +76,7 @@ public class OffsetStorageWriter {
private Map<Map<String, Object>, Map<String, Object>> data = new
HashMap<>();
private Map<Map<String, Object>, Map<String, Object>> toFlush = null;
+ private final Semaphore flushInProgress = new Semaphore(1);
// Unique ID for each flush request to handle callbacks after timeouts
private long currentFlushId = 0;
@@ -100,23 +104,50 @@ public class OffsetStorageWriter {
/**
* Performs the first step of a flush operation, snapshotting the current
state. This does not
- * actually initiate the flush with the underlying storage.
+ * actually initiate the flush with the underlying storage. Ensures that
any previous flush operations
+ * have finished before beginning a new flush.
*
* @return true if a flush was initiated, false if no data was available
+ * @throws ConnectException if the previous flush is not complete before
this method is called
*/
- public synchronized boolean beginFlush() {
- if (flushing()) {
- log.error("Invalid call to OffsetStorageWriter flush() while
already flushing, the "
+ public boolean beginFlush() {
+ try {
+ return beginFlush(0, TimeUnit.NANOSECONDS);
+ } catch (InterruptedException | TimeoutException e) {
+ log.error("Invalid call to OffsetStorageWriter beginFlush() while
already flushing, the "
+ "framework should not allow this");
throw new ConnectException("OffsetStorageWriter is already
flushing");
}
+ }
- if (data.isEmpty())
- return false;
-
- toFlush = data;
- data = new HashMap<>();
- return true;
+ /**
+ * Performs the first step of a flush operation, snapshotting the current
state. This does not
+ * actually initiate the flush with the underlying storage. Ensures that
any previous flush operations
+ * have finished before beginning a new flush.
+ * <p>If and only if this method returns true, the caller must call {@link
#doFlush(Callback)}
+ * or {@link #cancelFlush()} to finish the flush operation and allow later
calls to complete.
+ *
+ * @param timeout A maximum duration to wait for previous flushes to
finish before giving up on waiting
+ * @param timeUnit Units of the timeout argument
+ * @return true if a flush was initiated, false if no data was available
+ * @throws InterruptedException if this thread was interrupted while
waiting for the previous flush to complete
+ * @throws TimeoutException if the {@code timeout} elapses before previous
flushes are complete.
+ */
+ public boolean beginFlush(long timeout, TimeUnit timeUnit) throws
InterruptedException, TimeoutException {
+ if (flushInProgress.tryAcquire(Math.max(0, timeout), timeUnit)) {
+ synchronized (this) {
+ if (data.isEmpty()) {
+ flushInProgress.release();
+ return false;
+ } else {
+ toFlush = data;
+ data = new HashMap<>();
+ return true;
+ }
+ }
+ } else {
+ throw new TimeoutException("Timed out waiting for previous flush
to finish");
+ }
}
/**
@@ -193,6 +224,7 @@ public class OffsetStorageWriter {
toFlush.putAll(data);
data = toFlush;
currentFlushId++;
+ flushInProgress.release();
toFlush = null;
}
}
@@ -211,6 +243,7 @@ public class OffsetStorageWriter {
cancelFlush();
} else {
currentFlushId++;
+ flushInProgress.release();
toFlush = null;
}
return true;
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index e077cff912b..632f2d8f15f 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -79,6 +79,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@@ -904,7 +905,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
workerTaskFuture = executor.submit(workerTask);
}
- private void expectSuccessfulFlushes() {
+ private void expectSuccessfulFlushes() throws InterruptedException,
TimeoutException {
when(offsetWriter.beginFlush()).thenReturn(true);
when(offsetWriter.doFlush(any())).thenAnswer(invocation -> {
Callback<Void> cb = invocation.getArgument(0);
@@ -1036,7 +1037,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
verify(producer, times(count)).send(any(), any());
}
- private void verifyTransactions(int numBatches) throws
InterruptedException {
+ private void verifyTransactions(int numBatches) throws
InterruptedException, TimeoutException {
VerificationMode mode = times(numBatches);
verify(producer, mode).beginTransaction();
verify(producer, mode).commitTransaction();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index d8c79ff23ed..044b4a4e3a2 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -1364,6 +1364,53 @@ public class WorkerSinkTaskTest {
}
}
+ @Test
+ public void testTaskCancelPreventsFinalOffsetCommit() throws Exception {
+ createTask(initialState);
+ expectInitializeTask();
+ expectTaskGetTopic(true);
+
+ expectPollInitialAssignment();
+
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2);
+
+ // Put one message through the task to get some offsets to commit
+ expectConsumerPoll(1);
+ expectConversionAndTransformation(1);
+ sinkTask.put(EasyMock.anyObject());
+ PowerMock.expectLastCall();
+
+ // the second put will return after the task is stopped and cancelled
(asynchronously)
+ expectConsumerPoll(1);
+ expectConversionAndTransformation(1);
+ sinkTask.put(EasyMock.anyObject());
+ PowerMock.expectLastCall().andAnswer(() -> {
+ workerTask.stop();
+ workerTask.cancel();
+ return null;
+ });
+
+ // stop wakes up the consumer
+ consumer.wakeup();
+ EasyMock.expectLastCall();
+
+ // task performs normal steps in advance of committing offsets
+ final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2));
+ offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+ sinkTask.preCommit(offsets);
+ EasyMock.expectLastCall().andReturn(offsets);
+ sinkTask.close(EasyMock.anyObject());
+ PowerMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+ workerTask.execute();
+
+ PowerMock.verifyAll();
+ }
+
// Verify that when commitAsync is called but the supplied callback is not
called by the consumer before a
// rebalance occurs, the async callback does not reset the last committed
offset from the rebalance.
// See KAFKA-5731 for more information.
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index f2530681cf9..928f5d89430 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -417,7 +417,6 @@ public class WorkerSourceTaskTest {
sourceTask.stop();
EasyMock.expectLastCall();
- expectOffsetFlush(true);
expectClose();
@@ -1006,7 +1005,7 @@ public class WorkerSourceTaskTest {
@SuppressWarnings("unchecked")
private void expectOffsetFlush(boolean succeed) throws Exception {
- EasyMock.expect(offsetWriter.beginFlush()).andReturn(true);
+ EasyMock.expect(offsetWriter.beginFlush(EasyMock.anyLong(),
EasyMock.anyObject())).andReturn(true);
Future<Void> flushFuture = PowerMock.createMock(Future.class);
EasyMock.expect(offsetWriter.doFlush(EasyMock.anyObject(Callback.class))).andReturn(flushFuture);
// Should throw for failure
@@ -1024,7 +1023,7 @@ public class WorkerSourceTaskTest {
}
private void expectEmptyOffsetFlush() throws Exception {
- EasyMock.expect(offsetWriter.beginFlush()).andReturn(false);
+ EasyMock.expect(offsetWriter.beginFlush(EasyMock.anyLong(),
EasyMock.anyObject())).andReturn(false);
sourceTask.commit();
EasyMock.expectLastCall();
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
index b6eb0f6a487..8fa34ac95a3 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.connect.storage;
-import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.util.Callback;
import org.junit.After;
import org.junit.Before;
@@ -33,6 +32,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
@@ -82,7 +82,7 @@ public class OffsetStorageWriterTest {
writer.offset(OFFSET_KEY, OFFSET_VALUE);
- assertTrue(writer.beginFlush());
+ assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
verify(callback).onCompletion(isNull(), isNull());
}
@@ -96,7 +96,7 @@ public class OffsetStorageWriterTest {
writer.offset(OFFSET_KEY, null);
- assertTrue(writer.beginFlush());
+ assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
verify(callback).onCompletion(isNull(), isNull());
}
@@ -111,14 +111,15 @@ public class OffsetStorageWriterTest {
writer.offset(null, OFFSET_VALUE);
- assertTrue(writer.beginFlush());
+ assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
verify(callback).onCompletion(isNull(), isNull());
}
@Test
- public void testNoOffsetsToFlush() {
- assertFalse(writer.beginFlush());
+ public void testNoOffsetsToFlush() throws InterruptedException,
TimeoutException {
+ assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
+ assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
// If no offsets are flushed, we should finish immediately and not
have made any calls to the
// underlying storage layer
@@ -135,22 +136,22 @@ public class OffsetStorageWriterTest {
// First time the write fails
expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE,
OFFSET_VALUE_SERIALIZED, true, null);
writer.offset(OFFSET_KEY, OFFSET_VALUE);
- assertTrue(writer.beginFlush());
+ assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
verify(callback).onCompletion(eq(EXCEPTION), isNull());
// Second time it succeeds
expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE,
OFFSET_VALUE_SERIALIZED, false, null);
- assertTrue(writer.beginFlush());
+ assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
verify(callback).onCompletion(isNull(), isNull());
// Third time it has no data to flush so we won't get past beginFlush()
- assertFalse(writer.beginFlush());
+ assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
}
@Test
- public void testAlreadyFlushing() {
+ public void testAlreadyFlushing() throws InterruptedException,
TimeoutException {
@SuppressWarnings("unchecked")
final Callback<Void> callback = mock(Callback.class);
// Trigger the send, but don't invoke the callback so we'll still be
mid-flush
@@ -158,15 +159,18 @@ public class OffsetStorageWriterTest {
expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE,
OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown);
writer.offset(OFFSET_KEY, OFFSET_VALUE);
- assertTrue(writer.beginFlush());
+ assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
+ assertThrows(TimeoutException.class, () -> writer.beginFlush(1000L,
TimeUnit.MILLISECONDS));
writer.doFlush(callback);
- assertThrows(ConnectException.class, writer::beginFlush);
+ assertThrows(TimeoutException.class, () -> writer.beginFlush(1000L,
TimeUnit.MILLISECONDS));
+ allowStoreCompleteCountdown.countDown();
+ assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
}
@Test
- public void testCancelBeforeAwaitFlush() {
+ public void testCancelBeforeAwaitFlush() throws InterruptedException,
TimeoutException {
writer.offset(OFFSET_KEY, OFFSET_VALUE);
- assertTrue(writer.beginFlush());
+ assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
writer.cancelFlush();
}
@@ -180,7 +184,7 @@ public class OffsetStorageWriterTest {
expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE,
OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown);
writer.offset(OFFSET_KEY, OFFSET_VALUE);
- assertTrue(writer.beginFlush());
+ assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
// Start the flush, then immediately cancel before allowing the mocked
store request to finish
Future<Void> flushFuture = writer.doFlush(callback);
writer.cancelFlush();
@@ -214,7 +218,7 @@ public class OffsetStorageWriterTest {
keySerialized == null ? null : ByteBuffer.wrap(keySerialized),
valueSerialized == null ? null :
ByteBuffer.wrap(valueSerialized));
when(store.set(eq(offsetsSerialized),
storeCallback.capture())).thenAnswer(invocation -> {
- final Callback<Void> cb = storeCallback.getValue();
+ final Callback<Void> cb = invocation.getArgument(1);
return service.submit(() -> {
if (waitForCompletion != null)
assertTrue(waitForCompletion.await(10000,
TimeUnit.MILLISECONDS));