This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 808498e9391 KAFKA-14401: Fail kafka log read end requests if
underneath work thread fails (#14372)
808498e9391 is described below
commit 808498e9391dab292a7ccd8a0bf3713f444f9d2f
Author: vamossagar12 <[email protected]>
AuthorDate: Tue Jul 16 06:25:52 2024 +0530
KAFKA-14401: Fail kafka log read end requests if underneath work thread
fails (#14372)
Reviewers: Chris Egerton <[email protected]>
---
.../apache/kafka/connect/util/KafkaBasedLog.java | 26 ++++++++-----
.../kafka/connect/util/KafkaBasedLogTest.java | 45 ++++++++++++++++++++++
2 files changed, 62 insertions(+), 9 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 53e724da453..23f6d8a9c49 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -568,13 +568,12 @@ public class KafkaBasedLog<K, V> {
public WorkThread() {
super("KafkaBasedLog Work Thread - " + topic);
}
-
@Override
public void run() {
- try {
- log.trace("{} started execution", this);
- while (true) {
- int numCallbacks;
+ log.trace("{} started execution", this);
+ while (true) {
+ int numCallbacks = 0;
+ try {
synchronized (KafkaBasedLog.this) {
if (stopRequested)
break;
@@ -587,11 +586,11 @@ public class KafkaBasedLog<K, V> {
log.trace("Finished read to end log for topic {}",
topic);
} catch (TimeoutException e) {
log.warn("Timeout while reading log to end for
topic '{}'. Retrying automatically. " +
- "This may occur when brokers are
unavailable or unreachable. Reason: {}", topic, e.getMessage());
+ "This may occur when brokers are unavailable
or unreachable. Reason: {}", topic, e.getMessage());
continue;
} catch (RetriableException |
org.apache.kafka.connect.errors.RetriableException e) {
log.warn("Retriable error while reading log to end
for topic '{}'. Retrying automatically. " +
- "Reason: {}", topic, e.getMessage());
+ "Reason: {}", topic, e.getMessage());
continue;
} catch (WakeupException e) {
// Either received another get() call and need to
retry reading to end of log or stop() was
@@ -615,9 +614,18 @@ public class KafkaBasedLog<K, V> {
// See previous comment, both possible causes of this
wakeup are handled by starting this loop again
continue;
}
+ } catch (Throwable t) {
+ log.error("Unexpected exception in {}", this, t);
+ synchronized (KafkaBasedLog.this) {
+ // Only fail exactly the number of callbacks we found
before triggering the read to log end
+ // since it is possible for another write + readToEnd
to sneak in the meantime which we don't
+ // want to fail.
+ for (int i = 0; i < numCallbacks; i++) {
+ Callback<Void> cb =
readLogEndOffsetCallbacks.poll();
+ cb.onCompletion(t, null);
+ }
+ }
}
- } catch (Throwable t) {
- log.error("Unexpected exception in {}", this, t);
}
}
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index 3c5155f4889..f29d3bd29f2 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -57,6 +57,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -404,6 +405,50 @@ public class KafkaBasedLogTest {
verifyStartAndStop();
}
+ @Test
+ public void testOffsetReadFailureWhenWorkThreadFails() throws Exception {
+ RuntimeException exception = new RuntimeException();
+ Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
+ Map<TopicPartition, Long> endOffsets = new HashMap<>();
+ endOffsets.put(TP0, 0L);
+ endOffsets.put(TP1, 0L);
+ admin = mock(TopicAdmin.class);
+ when(admin.endOffsets(eq(tps)))
+ .thenReturn(endOffsets)
+ .thenThrow(exception)
+ .thenReturn(endOffsets);
+
+ store.start();
+
+ AtomicInteger numSuccesses = new AtomicInteger();
+ AtomicInteger numFailures = new AtomicInteger();
+ AtomicReference<FutureCallback<Void>> finalSuccessCallbackRef = new
AtomicReference<>();
+ final FutureCallback<Void> successCallback = new
FutureCallback<>((error, result) -> numSuccesses.getAndIncrement());
+ store.readToEnd(successCallback);
+ // First log end read should succeed.
+ successCallback.get(1000, TimeUnit.MILLISECONDS);
+
+ // Second log end read fails.
+ final FutureCallback<Void> firstFailedCallback = new
FutureCallback<>((error, result) -> {
+ numFailures.getAndIncrement();
+ // We issue another readToEnd call here to simulate the case that
more read requests can come in while
+ // the failure is being handled in the WorkThread. This read
request should not be impacted by the outcome of
+ // the current read request's failure.
+ final FutureCallback<Void> finalSuccessCallback = new
FutureCallback<>((e, r) -> numSuccesses.getAndIncrement());
+ finalSuccessCallbackRef.set(finalSuccessCallback);
+ store.readToEnd(finalSuccessCallback);
+ });
+ store.readToEnd(firstFailedCallback);
+ ExecutionException e1 = assertThrows(ExecutionException.class, () ->
firstFailedCallback.get(1000, TimeUnit.MILLISECONDS));
+ assertEquals(exception, e1.getCause());
+
+ // Last log read end should succeed.
+ finalSuccessCallbackRef.get().get(1000, TimeUnit.MILLISECONDS);
+
+ assertEquals(2, numSuccesses.get());
+ assertEquals(1, numFailures.get());
+ }
+
@Test
public void testProducerError() {
TestFuture<RecordMetadata> tp0Future = new TestFuture<>();