[
https://issues.apache.org/jira/browse/KAFKA-17402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ao Li reopened KAFKA-17402:
---------------------------
I don't think this bug is fixed.
I've rebased the branch to the current truck and can still reproduce the
failure.
https://github.com/aoli-al/kafka/commits/KAFKA-92/
With command ./gradlew :streams:test --tests
DefaultStateUpdaterTest.shouldGetTasksFromRestoredActiveTasks
> Test failure: DefaultStateUpdaterTest.shouldGetTasksFromRestoredActiveTasks
> expected: <2> but was: <3>
> -------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-17402
> URL: https://issues.apache.org/jira/browse/KAFKA-17402
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Ao Li
> Priority: Major
>
> I saw a test failure caused by a concurrency issue in DefaultStateUpdater.
> {code}
> org.opentest4j.AssertionFailedError: expected: <2> but was: <3>
> at
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
> at
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> at
> org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
> at
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)
> at
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145)
> at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:531)
> at
> org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.verifyGetTasks(DefaultStateUpdaterTest.java:1688)
> at
> org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldGetTasksFromRestoredActiveTasks(DefaultStateUpdaterTest.java:1517)
> at java.base/java.lang.reflect.Method.invoke(Method.java:580)
> at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
> at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
> {code}
> To reproduce the failure, you may clone this fork
> https://github.com/aoli-al/kafka/tree/KAFKA-92 and run `./gradlew
> :streams:test --tests
> DefaultStateUpdaterTest.shouldGetTasksFromRestoredActiveTasks`
> The root cause of the issue is that function
> `DefaultStateUpdater::maybeCompleteRestoration` is not atomic.
> {code}
> // This code will unblock the `verifyRestoredActiveTasks` in
> the test
> addToRestoredTasks(task);
> //If the test method resumes before the task is removed from
> `updatingTasks`, the test will fail.
> updatingTasks.remove(task.id());
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)