This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a0291a8d50d MINOR: Fix flaky state updater test (#18253)
a0291a8d50d is described below
commit a0291a8d50d116c536333007185f185281b1bd6c
Author: Bruno Cadonna <[email protected]>
AuthorDate: Thu Dec 19 09:02:38 2024 +0100
MINOR: Fix flaky state updater test (#18253)
The tests are flaky because the tests end before the verified calls
are executed. This happens because the state updater thread executes
the verified calls, but the thread that executes the
tests with the verifications is a different thread.
This commit fixes the flaky tests by enusring that the calls were
performed by the state updater by either shutting down the state
updater or waiting for the condition.
Reviewers: Lucas Brutschy <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../kafka/streams/processor/internals/DefaultStateUpdater.java | 2 +-
.../streams/processor/internals/DefaultStateUpdaterTest.java | 10 +++++++++-
2 files changed, 10 insertions(+), 2 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index addef5a9f15..bb2f2ddd5b7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -320,7 +320,7 @@ public class DefaultStateUpdater implements StateUpdater {
private void handleRuntimeException(final RuntimeException
runtimeException) {
- log.error("An unexpected error occurred within the state updater
thread: " + runtimeException);
+ log.error("An unexpected error occurred within the state updater
thread: {}", String.valueOf(runtimeException));
addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks(runtimeException);
isRunning.set(false);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 25f3f0e587f..adc71ebc116 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -290,7 +290,11 @@ class DefaultStateUpdaterTest {
stateUpdater.add(task2);
verifyFailedTasks(IllegalStateException.class, task1);
- assertFalse(stateUpdater.isRunning());
+ waitForCondition(
+ () -> !stateUpdater.isRunning(),
+ VERIFICATION_TIMEOUT,
+ "Did not switch to non-running within the given timeout!"
+ );
}
@Test
@@ -1015,6 +1019,8 @@ class DefaultStateUpdaterTest {
verifyRestoredActiveTasks();
verifyUpdatingTasks(task2);
verifyExceptionsAndFailedTasks();
+ // shutdown ensures that the test does not end before changelog reader
methods verified below are called
+ stateUpdater.shutdown(Duration.ofMinutes(1));
verify(changelogReader, times(1)).enforceRestoreActive();
verify(changelogReader, times(1)).transitToUpdateStandby();
}
@@ -1152,6 +1158,8 @@ class DefaultStateUpdaterTest {
public void shouldResumeStandbyTask() throws Exception {
final StandbyTask task = standbyTask(TASK_0_0,
Set.of(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldResumeStatefulTask(task);
+ // shutdown ensures that the test does not end before changelog reader
methods verified below are called
+ stateUpdater.shutdown(Duration.ofMinutes(1));
verify(changelogReader, times(2)).transitToUpdateStandby();
}