This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 808498e9391 KAFKA-14401: Fail kafka log read end requests if 
underneath work thread fails (#14372)
808498e9391 is described below

commit 808498e9391dab292a7ccd8a0bf3713f444f9d2f
Author: vamossagar12 <[email protected]>
AuthorDate: Tue Jul 16 06:25:52 2024 +0530

    KAFKA-14401: Fail kafka log read end requests if underneath work thread 
fails (#14372)
    
    Reviewers: Chris Egerton <[email protected]>
---
 .../apache/kafka/connect/util/KafkaBasedLog.java   | 26 ++++++++-----
 .../kafka/connect/util/KafkaBasedLogTest.java      | 45 ++++++++++++++++++++++
 2 files changed, 62 insertions(+), 9 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 53e724da453..23f6d8a9c49 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -568,13 +568,12 @@ public class KafkaBasedLog<K, V> {
         public WorkThread() {
             super("KafkaBasedLog Work Thread - " + topic);
         }
-
         @Override
         public void run() {
-            try {
-                log.trace("{} started execution", this);
-                while (true) {
-                    int numCallbacks;
+            log.trace("{} started execution", this);
+            while (true) {
+                int numCallbacks = 0;
+                try {
                     synchronized (KafkaBasedLog.this) {
                         if (stopRequested)
                             break;
@@ -587,11 +586,11 @@ public class KafkaBasedLog<K, V> {
                             log.trace("Finished read to end log for topic {}", 
topic);
                         } catch (TimeoutException e) {
                             log.warn("Timeout while reading log to end for 
topic '{}'. Retrying automatically. " +
-                                     "This may occur when brokers are 
unavailable or unreachable. Reason: {}", topic, e.getMessage());
+                                "This may occur when brokers are unavailable 
or unreachable. Reason: {}", topic, e.getMessage());
                             continue;
                         } catch (RetriableException | 
org.apache.kafka.connect.errors.RetriableException e) {
                             log.warn("Retriable error while reading log to end 
for topic '{}'. Retrying automatically. " +
-                                     "Reason: {}", topic, e.getMessage());
+                                "Reason: {}", topic, e.getMessage());
                             continue;
                         } catch (WakeupException e) {
                             // Either received another get() call and need to 
retry reading to end of log or stop() was
@@ -615,9 +614,18 @@ public class KafkaBasedLog<K, V> {
                         // See previous comment, both possible causes of this 
wakeup are handled by starting this loop again
                         continue;
                     }
+                } catch (Throwable t) {
+                    log.error("Unexpected exception in {}", this, t);
+                    synchronized (KafkaBasedLog.this) {
+                        // Only fail exactly the number of callbacks we found 
before triggering the read to log end
+                        // since it is possible for another write + readToEnd 
to sneak in the meantime which we don't
+                        // want to fail.
+                        for (int i = 0; i < numCallbacks; i++) {
+                            Callback<Void> cb = 
readLogEndOffsetCallbacks.poll();
+                            cb.onCompletion(t, null);
+                        }
+                    }
                 }
-            } catch (Throwable t) {
-                log.error("Unexpected exception in {}", this, t);
             }
         }
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index 3c5155f4889..f29d3bd29f2 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -57,6 +57,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -404,6 +405,50 @@ public class KafkaBasedLogTest {
         verifyStartAndStop();
     }
 
+    @Test
+    public void testOffsetReadFailureWhenWorkThreadFails() throws Exception {
+        RuntimeException exception = new RuntimeException();
+        Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 0L);
+        endOffsets.put(TP1, 0L);
+        admin = mock(TopicAdmin.class);
+        when(admin.endOffsets(eq(tps)))
+            .thenReturn(endOffsets)
+            .thenThrow(exception)
+            .thenReturn(endOffsets);
+
+        store.start();
+
+        AtomicInteger numSuccesses = new AtomicInteger();
+        AtomicInteger numFailures = new AtomicInteger();
+        AtomicReference<FutureCallback<Void>> finalSuccessCallbackRef = new 
AtomicReference<>();
+        final FutureCallback<Void> successCallback = new 
FutureCallback<>((error, result) -> numSuccesses.getAndIncrement());
+        store.readToEnd(successCallback);
+        // First log end read should succeed.
+        successCallback.get(1000, TimeUnit.MILLISECONDS);
+
+        // Second log end read fails.
+        final FutureCallback<Void> firstFailedCallback = new 
FutureCallback<>((error, result) -> {
+            numFailures.getAndIncrement();
+            // We issue another readToEnd call here to simulate the case that 
more read requests can come in while
+            // the failure is being handled in the WorkThread. This read 
request should not be impacted by the outcome of
+            // the current read request's failure.
+            final FutureCallback<Void> finalSuccessCallback = new 
FutureCallback<>((e, r) -> numSuccesses.getAndIncrement());
+            finalSuccessCallbackRef.set(finalSuccessCallback);
+            store.readToEnd(finalSuccessCallback);
+        });
+        store.readToEnd(firstFailedCallback);
+        ExecutionException e1 = assertThrows(ExecutionException.class, () -> 
firstFailedCallback.get(1000, TimeUnit.MILLISECONDS));
+        assertEquals(exception, e1.getCause());
+
+        // Last log read end should succeed.
+        finalSuccessCallbackRef.get().get(1000, TimeUnit.MILLISECONDS);
+
+        assertEquals(2, numSuccesses.get());
+        assertEquals(1, numFailures.get());
+    }
+
     @Test
     public void testProducerError() {
         TestFuture<RecordMetadata> tp0Future = new TestFuture<>();

Reply via email to