This is an automated email from the ASF dual-hosted git repository.

viktorsomogyi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e0456d1df2a KAFKA-17871: avoid blocking the herder thread when 
producer flushing hangs (#18142)
e0456d1df2a is described below

commit e0456d1df2a685d0d8da53aeb265d793f8508b66
Author: Davide Armand <[email protected]>
AuthorDate: Wed May 27 17:25:18 2026 +0200

    KAFKA-17871: avoid blocking the herder thread when producer flushing hangs 
(#18142)
    
    The call to `backingStore.get()` (called by connector task threads
    through `OffsetStorageReaderImpl.offsets()`) can block for long time
    waiting for data flush to complete (`KafkaProducer.flush()`).
    
    This change moves that call outside the synchronized clause that holds
    `offsetReadFutures`, so that if `backingStore.get()` hangs then it does
    not keep `offsetReadFutures` locked. The access to `closed` flag
    (`closed.get()`) is kept inside the synchronize clause to avoid race
    condition with `close()`.
    
    This is important because `OffsetStorageReaderImpl.close()` needs to
    lock `offsetReadFutures` as well in order to cancel the futures.  Since
    the herder thread calls `OffsetStorageReaderImpl.close()` when
    attempting to stops a task, before this change this was resulting in the
    herder thread hanging indefinetely waiting for `backingStore.get()` to
    complete.
    
    Reviewers: Greg Harris <[email protected]>, Viktor Somogyi-Vass 
<[email protected]>
---
 .../connect/storage/OffsetStorageReaderImpl.java   |  39 ++++-
 .../connect/storage/OffsetStorageReaderTest.java   | 171 +++++++++++++++++++++
 2 files changed, 202 insertions(+), 8 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
index c17d2fb099c..2ede25d2d83 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
@@ -41,6 +41,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class OffsetStorageReaderImpl implements CloseableOffsetStorageReader {
     private static final Logger log = 
LoggerFactory.getLogger(OffsetStorageReaderImpl.class);
 
+    private static final String CLOSED_ERR_MSG = "Offset reader is closed. 
This is likely because the task has already been "
+            + "scheduled to stop but has taken longer than the graceful 
shutdown "
+            + "period to do so.";
+
     private final OffsetBackingStore backingStore;
     private final String namespace;
     private final Converter keyConverter;
@@ -64,8 +68,13 @@ public class OffsetStorageReaderImpl implements 
CloseableOffsetStorageReader {
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public <T> Map<Map<String, T>, Map<String, Object>> 
offsets(Collection<Map<String, T>> partitions) {
+        Map<ByteBuffer, Map<String, T>> serializedToOriginal = 
serializePartitions(partitions);
+        Map<ByteBuffer, ByteBuffer> raw = 
readOffsetsFromBackingStore(serializedToOriginal.keySet());
+        return deserializeOffsets(serializedToOriginal, raw);
+    }
+
+    private <T> Map<ByteBuffer, Map<String, T>> 
serializePartitions(Collection<Map<String, T>> partitions) {
         // Serialize keys so backing store can work with them
         Map<ByteBuffer, Map<String, T>> serializedToOriginal = new 
HashMap<>(partitions.size());
         for (Map<String, T> key : partitions) {
@@ -81,19 +90,27 @@ public class OffsetStorageReaderImpl implements 
CloseableOffsetStorageReader {
                         + "task or cause it to skip some data.", namespace, t);
             }
         }
+        return serializedToOriginal;
+    }
 
+    private Map<ByteBuffer, ByteBuffer> 
readOffsetsFromBackingStore(Set<ByteBuffer> keys) {
         // Get serialized key -> serialized value from backing store
         Map<ByteBuffer, ByteBuffer> raw;
         try {
             Future<Map<ByteBuffer, ByteBuffer>> offsetReadFuture;
+
+            if (closed.get()) {
+                throw new ConnectException(CLOSED_ERR_MSG);
+            }
+
+            // Note: this call can block for long time waiting for data flush 
to complete (`KafkaProducer.flush()`).
+            offsetReadFuture = backingStore.get(keys);
+
             synchronized (offsetReadFutures) {
                 if (closed.get()) {
-                    throw new ConnectException(
-                        "Offset reader is closed. This is likely because the 
task has already been "
-                            + "scheduled to stop but has taken longer than the 
graceful shutdown "
-                            + "period to do so.");
+                    offsetReadFuture.cancel(true);
+                    throw new ConnectException(CLOSED_ERR_MSG);
                 }
-                offsetReadFuture = 
backingStore.get(serializedToOriginal.keySet());
                 offsetReadFutures.add(offsetReadFuture);
             }
 
@@ -102,7 +119,7 @@ public class OffsetStorageReaderImpl implements 
CloseableOffsetStorageReader {
             } catch (CancellationException e) {
                 throw new ConnectException(
                     "Offset reader closed while attempting to read offsets. 
This is likely because "
-                        + "the task was been scheduled to stop but has taken 
longer than the "
+                        + "the task has been scheduled to stop but has taken 
longer than the "
                         + "graceful shutdown period to do so.");
             } finally {
                 synchronized (offsetReadFutures) {
@@ -113,9 +130,15 @@ public class OffsetStorageReaderImpl implements 
CloseableOffsetStorageReader {
             log.error("Failed to fetch offsets from namespace {}: ", 
namespace, e);
             throw new ConnectException("Failed to fetch offsets.", e);
         }
+        return raw;
+    }
 
+    @SuppressWarnings("unchecked")
+    private <T> Map<Map<String, T>, Map<String, Object>> deserializeOffsets(
+            Map<ByteBuffer, Map<String, T>> serializedToOriginal,
+            Map<ByteBuffer, ByteBuffer> raw) {
         // Deserialize all the values and map back to the original keys
-        Map<Map<String, T>, Map<String, Object>> result = new 
HashMap<>(partitions.size());
+        Map<Map<String, T>, Map<String, Object>> result = new 
HashMap<>(serializedToOriginal.size());
         for (Map.Entry<ByteBuffer, ByteBuffer> rawEntry : raw.entrySet()) {
             try {
                 // Since null could be a valid key, explicitly check whether 
map contains the key
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageReaderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageReaderTest.java
new file mode 100644
index 00000000000..01646c1323f
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageReaderTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+import org.mockito.stubbing.Answer;
+
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class OffsetStorageReaderTest {
+
+    @Mock
+    private Converter taskKeyConverter;
+
+    @Mock
+    private Converter taskValueConverter;
+
+    @Mock
+    private OffsetBackingStore offsetBackingStore;
+
+    @Test
+    @Timeout(60)
+    public void testClosingOffsetReaderWhenOffsetStoreHangs() throws Exception 
{
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        OffsetStorageReaderImpl offsetStorageReaderImpl = new 
OffsetStorageReaderImpl(
+                offsetBackingStore, "namespace", taskKeyConverter, 
taskValueConverter);
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        // Hanging `offsetBackingStore.get()`
+        doAnswer(invocation -> {
+            latch.countDown();
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            future.get(9999, TimeUnit.SECONDS);
+            throw new RuntimeException("Should never get here");
+        }).when(offsetBackingStore).get(any());
+
+        // Connector task thread hanging
+        executor.submit(() -> {
+            // Does call offsetBackingStore.get() and hangs
+            offsetStorageReaderImpl.offsets(Collections.emptyList());
+        });
+
+        // Ensure the task is hanging
+        latch.await();
+
+        verify(offsetBackingStore, times(1)).get(any());
+
+        // The herder thread should not block when trying to close 
`offsetStorageReaderImpl`
+        // and complete before test timeout
+        offsetStorageReaderImpl.close();
+
+        executor.shutdownNow();
+        if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+            throw new RuntimeException("Failed to shutdown executor");
+        }
+    }
+
+    @Test
+    @Timeout(60)
+    public void 
testClosingOffsetReaderWhenOffsetStoreHangsAndHasIncompleteFutures() throws 
Exception {
+        // Test similar to `testClosingOffsetReaderWhenOffsetStoreHangs` 
above, but in this case
+        // `OffsetStorageReaderImpl.offsetReadFutures` contains a future when 
`offsetStorageReaderImpl.close()` is called.
+
+        ExecutorService executor = Executors.newFixedThreadPool(2);
+        CompletableFuture<?> hangingFuture = mock(CompletableFuture.class);
+
+        OffsetStorageReaderImpl offsetStorageReaderImpl = new 
OffsetStorageReaderImpl(
+                offsetBackingStore, "namespace", taskKeyConverter, 
taskValueConverter);
+
+        CountDownLatch latchTask1 = new CountDownLatch(1);
+        CountDownLatch latchTask2 = new CountDownLatch(1);
+
+        // Mock hanging future
+        doAnswer(invocation -> {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            future.get(9999, TimeUnit.SECONDS);
+            throw new RuntimeException("Should never get here");
+        }).when(hangingFuture).get();
+
+        // Mock `offsetBackingStore.get()`
+        doAnswer(new Answer<Object>() {
+                int callCount = 0;
+
+                @Override
+                public Object answer(InvocationOnMock invocation) throws 
Throwable {
+                    if (callCount == 0) {
+                        callCount += 1;
+                        // First connector task
+                        latchTask1.countDown();
+                        return hangingFuture;
+                    } else {
+                        // Second connector task
+                        latchTask2.countDown();
+                        CompletableFuture<Void> future = new 
CompletableFuture<>();
+                        future.get(9999, TimeUnit.SECONDS);
+                        throw new RuntimeException("Should never get here");
+                    }
+                }
+            }
+        ).when(offsetBackingStore).get(any());
+
+
+        // Connector task thread calls `offsets()` --> hangs on 
`hangingFuture.get()`
+        // --> the future is added to 
`offsetStorageReaderImpl.offsetReadFutures` and never removed
+        executor.submit(() -> {
+            offsetStorageReaderImpl.offsets(Collections.emptyList());
+        });
+        // Ensure first task is hanging
+        latchTask1.await();
+
+        verify(offsetBackingStore, times(1)).get(any());
+        verify(hangingFuture, times(1)).get();
+
+        // Another connector task thread calls `offsets()` --> hangs on 
offsetBackingStore.get()
+        // --> the future is never added to 
`offsetStorageReaderImpl.offsetReadFutures`
+        executor.submit(() -> {
+            offsetStorageReaderImpl.offsets(Collections.emptyList());
+        });
+        // Ensure second task is hanging
+        latchTask2.await();
+
+        verify(offsetBackingStore, times(2)).get(any());
+
+        // The herder thread should not block when trying to close 
`offsetStorageReaderImpl` and should complete
+        // before the test timeout
+        offsetStorageReaderImpl.close();
+
+        // The hanging future should be cancelled by `close()`
+        verify(hangingFuture, times(1)).cancel(true);
+
+        executor.shutdownNow();
+        if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+            throw new RuntimeException("Failed to shutdown executor");
+        }
+    }
+}

Reply via email to