This is an automated email from the ASF dual-hosted git repository.
viktorsomogyi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e0456d1df2a KAFKA-17871: avoid blocking the herder thread when
producer flushing hangs (#18142)
e0456d1df2a is described below
commit e0456d1df2a685d0d8da53aeb265d793f8508b66
Author: Davide Armand <[email protected]>
AuthorDate: Wed May 27 17:25:18 2026 +0200
KAFKA-17871: avoid blocking the herder thread when producer flushing hangs
(#18142)
The call to `backingStore.get()` (called by connector task threads
through `OffsetStorageReaderImpl.offsets()`) can block for long time
waiting for data flush to complete (`KafkaProducer.flush()`).
This change moves that call outside the synchronized clause that holds
`offsetReadFutures`, so that if `backingStore.get()` hangs then it does
not keep `offsetReadFutures` locked. The access to `closed` flag
(`closed.get()`) is kept inside the synchronize clause to avoid race
condition with `close()`.
This is important because `OffsetStorageReaderImpl.close()` needs to
lock `offsetReadFutures` as well in order to cancel the futures. Since
the herder thread calls `OffsetStorageReaderImpl.close()` when
attempting to stops a task, before this change this was resulting in the
herder thread hanging indefinetely waiting for `backingStore.get()` to
complete.
Reviewers: Greg Harris <[email protected]>, Viktor Somogyi-Vass
<[email protected]>
---
.../connect/storage/OffsetStorageReaderImpl.java | 39 ++++-
.../connect/storage/OffsetStorageReaderTest.java | 171 +++++++++++++++++++++
2 files changed, 202 insertions(+), 8 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
index c17d2fb099c..2ede25d2d83 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
@@ -41,6 +41,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class OffsetStorageReaderImpl implements CloseableOffsetStorageReader {
private static final Logger log =
LoggerFactory.getLogger(OffsetStorageReaderImpl.class);
+ private static final String CLOSED_ERR_MSG = "Offset reader is closed.
This is likely because the task has already been "
+ + "scheduled to stop but has taken longer than the graceful
shutdown "
+ + "period to do so.";
+
private final OffsetBackingStore backingStore;
private final String namespace;
private final Converter keyConverter;
@@ -64,8 +68,13 @@ public class OffsetStorageReaderImpl implements
CloseableOffsetStorageReader {
}
@Override
- @SuppressWarnings("unchecked")
public <T> Map<Map<String, T>, Map<String, Object>>
offsets(Collection<Map<String, T>> partitions) {
+ Map<ByteBuffer, Map<String, T>> serializedToOriginal =
serializePartitions(partitions);
+ Map<ByteBuffer, ByteBuffer> raw =
readOffsetsFromBackingStore(serializedToOriginal.keySet());
+ return deserializeOffsets(serializedToOriginal, raw);
+ }
+
+ private <T> Map<ByteBuffer, Map<String, T>>
serializePartitions(Collection<Map<String, T>> partitions) {
// Serialize keys so backing store can work with them
Map<ByteBuffer, Map<String, T>> serializedToOriginal = new
HashMap<>(partitions.size());
for (Map<String, T> key : partitions) {
@@ -81,19 +90,27 @@ public class OffsetStorageReaderImpl implements
CloseableOffsetStorageReader {
+ "task or cause it to skip some data.", namespace, t);
}
}
+ return serializedToOriginal;
+ }
+ private Map<ByteBuffer, ByteBuffer>
readOffsetsFromBackingStore(Set<ByteBuffer> keys) {
// Get serialized key -> serialized value from backing store
Map<ByteBuffer, ByteBuffer> raw;
try {
Future<Map<ByteBuffer, ByteBuffer>> offsetReadFuture;
+
+ if (closed.get()) {
+ throw new ConnectException(CLOSED_ERR_MSG);
+ }
+
+ // Note: this call can block for long time waiting for data flush
to complete (`KafkaProducer.flush()`).
+ offsetReadFuture = backingStore.get(keys);
+
synchronized (offsetReadFutures) {
if (closed.get()) {
- throw new ConnectException(
- "Offset reader is closed. This is likely because the
task has already been "
- + "scheduled to stop but has taken longer than the
graceful shutdown "
- + "period to do so.");
+ offsetReadFuture.cancel(true);
+ throw new ConnectException(CLOSED_ERR_MSG);
}
- offsetReadFuture =
backingStore.get(serializedToOriginal.keySet());
offsetReadFutures.add(offsetReadFuture);
}
@@ -102,7 +119,7 @@ public class OffsetStorageReaderImpl implements
CloseableOffsetStorageReader {
} catch (CancellationException e) {
throw new ConnectException(
"Offset reader closed while attempting to read offsets.
This is likely because "
- + "the task was been scheduled to stop but has taken
longer than the "
+ + "the task has been scheduled to stop but has taken
longer than the "
+ "graceful shutdown period to do so.");
} finally {
synchronized (offsetReadFutures) {
@@ -113,9 +130,15 @@ public class OffsetStorageReaderImpl implements
CloseableOffsetStorageReader {
log.error("Failed to fetch offsets from namespace {}: ",
namespace, e);
throw new ConnectException("Failed to fetch offsets.", e);
}
+ return raw;
+ }
+ @SuppressWarnings("unchecked")
+ private <T> Map<Map<String, T>, Map<String, Object>> deserializeOffsets(
+ Map<ByteBuffer, Map<String, T>> serializedToOriginal,
+ Map<ByteBuffer, ByteBuffer> raw) {
// Deserialize all the values and map back to the original keys
- Map<Map<String, T>, Map<String, Object>> result = new
HashMap<>(partitions.size());
+ Map<Map<String, T>, Map<String, Object>> result = new
HashMap<>(serializedToOriginal.size());
for (Map.Entry<ByteBuffer, ByteBuffer> rawEntry : raw.entrySet()) {
try {
// Since null could be a valid key, explicitly check whether
map contains the key
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageReaderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageReaderTest.java
new file mode 100644
index 00000000000..01646c1323f
--- /dev/null
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageReaderTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+import org.mockito.stubbing.Answer;
+
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class OffsetStorageReaderTest {
+
+ @Mock
+ private Converter taskKeyConverter;
+
+ @Mock
+ private Converter taskValueConverter;
+
+ @Mock
+ private OffsetBackingStore offsetBackingStore;
+
+ @Test
+ @Timeout(60)
+ public void testClosingOffsetReaderWhenOffsetStoreHangs() throws Exception
{
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ OffsetStorageReaderImpl offsetStorageReaderImpl = new
OffsetStorageReaderImpl(
+ offsetBackingStore, "namespace", taskKeyConverter,
taskValueConverter);
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ // Hanging `offsetBackingStore.get()`
+ doAnswer(invocation -> {
+ latch.countDown();
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ future.get(9999, TimeUnit.SECONDS);
+ throw new RuntimeException("Should never get here");
+ }).when(offsetBackingStore).get(any());
+
+ // Connector task thread hanging
+ executor.submit(() -> {
+ // Does call offsetBackingStore.get() and hangs
+ offsetStorageReaderImpl.offsets(Collections.emptyList());
+ });
+
+ // Ensure the task is hanging
+ latch.await();
+
+ verify(offsetBackingStore, times(1)).get(any());
+
+ // The herder thread should not block when trying to close
`offsetStorageReaderImpl`
+ // and complete before test timeout
+ offsetStorageReaderImpl.close();
+
+ executor.shutdownNow();
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ throw new RuntimeException("Failed to shutdown executor");
+ }
+ }
+
+ @Test
+ @Timeout(60)
+ public void
testClosingOffsetReaderWhenOffsetStoreHangsAndHasIncompleteFutures() throws
Exception {
+ // Test similar to `testClosingOffsetReaderWhenOffsetStoreHangs`
above, but in this case
+ // `OffsetStorageReaderImpl.offsetReadFutures` contains a future when
`offsetStorageReaderImpl.close()` is called.
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ CompletableFuture<?> hangingFuture = mock(CompletableFuture.class);
+
+ OffsetStorageReaderImpl offsetStorageReaderImpl = new
OffsetStorageReaderImpl(
+ offsetBackingStore, "namespace", taskKeyConverter,
taskValueConverter);
+
+ CountDownLatch latchTask1 = new CountDownLatch(1);
+ CountDownLatch latchTask2 = new CountDownLatch(1);
+
+ // Mock hanging future
+ doAnswer(invocation -> {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ future.get(9999, TimeUnit.SECONDS);
+ throw new RuntimeException("Should never get here");
+ }).when(hangingFuture).get();
+
+ // Mock `offsetBackingStore.get()`
+ doAnswer(new Answer<Object>() {
+ int callCount = 0;
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws
Throwable {
+ if (callCount == 0) {
+ callCount += 1;
+ // First connector task
+ latchTask1.countDown();
+ return hangingFuture;
+ } else {
+ // Second connector task
+ latchTask2.countDown();
+ CompletableFuture<Void> future = new
CompletableFuture<>();
+ future.get(9999, TimeUnit.SECONDS);
+ throw new RuntimeException("Should never get here");
+ }
+ }
+ }
+ ).when(offsetBackingStore).get(any());
+
+
+ // Connector task thread calls `offsets()` --> hangs on
`hangingFuture.get()`
+ // --> the future is added to
`offsetStorageReaderImpl.offsetReadFutures` and never removed
+ executor.submit(() -> {
+ offsetStorageReaderImpl.offsets(Collections.emptyList());
+ });
+ // Ensure first task is hanging
+ latchTask1.await();
+
+ verify(offsetBackingStore, times(1)).get(any());
+ verify(hangingFuture, times(1)).get();
+
+ // Another connector task thread calls `offsets()` --> hangs on
offsetBackingStore.get()
+ // --> the future is never added to
`offsetStorageReaderImpl.offsetReadFutures`
+ executor.submit(() -> {
+ offsetStorageReaderImpl.offsets(Collections.emptyList());
+ });
+ // Ensure second task is hanging
+ latchTask2.await();
+
+ verify(offsetBackingStore, times(2)).get(any());
+
+ // The herder thread should not block when trying to close
`offsetStorageReaderImpl` and should complete
+ // before the test timeout
+ offsetStorageReaderImpl.close();
+
+ // The hanging future should be cancelled by `close()`
+ verify(hangingFuture, times(1)).cancel(true);
+
+ executor.shutdownNow();
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ throw new RuntimeException("Failed to shutdown executor");
+ }
+ }
+}