This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 20b073bbeea KAFKA-18498: Update lock ownership from main thread
(#18732)
20b073bbeea is described below
commit 20b073bbeea7c191048ea626d5835bcb8769f091
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Jan 29 14:09:44 2025 -0500
KAFKA-18498: Update lock ownership from main thread (#18732)
Once a StreamThread receives its assignment, it will close the startup
tasks. But during the closing process, the StandbyTask.closeClean() method will
eventually call theStatemanagerUtil.closeStateManager method which needs to
lock the state directory, but locking requires the calling thread be the
current owner. Since the main thread grabs the lock on startup but moves on
without releasing it, we need to update ownership explicitly here in order for
the stream thread to close the sta [...]
Reviewers: Matthias Sax <[email protected]>, Nick Telford
---
.../org/apache/kafka/streams/processor/internals/StateDirectory.java | 5 ++---
tests/kafkatest/tests/streams/streams_eos_test.py | 5 -----
2 files changed, 2 insertions(+), 8 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 97525b8972d..a95d20ddae0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -51,7 +51,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -107,7 +106,7 @@ public class StateDirectory implements AutoCloseable {
private final boolean hasPersistentStores;
private final boolean hasNamedTopologies;
- private final HashMap<TaskId, Thread> lockedTasksToOwner = new HashMap<>();
+ private final ConcurrentMap<TaskId, Thread> lockedTasksToOwner = new
ConcurrentHashMap<>();
private FileChannel stateDirLockChannel;
private FileLock stateDirLock;
@@ -286,7 +285,7 @@ public class StateDirectory implements AutoCloseable {
// "drain" Tasks first to ensure that we don't try to close Tasks
that another thread is attempting to close
final Set<Task> drainedTasks = new
HashSet<>(tasksForLocalState.size());
for (final Map.Entry<TaskId, Task> entry :
tasksForLocalState.entrySet()) {
- if (predicate.test(entry.getValue()) &&
tasksForLocalState.remove(entry.getKey()) != null) {
+ if (predicate.test(entry.getValue()) &&
removeStartupTask(entry.getKey()) != null) {
// only add to our list of drained Tasks if we exclusively
"claimed" a Task from tasksForLocalState
// to ensure we don't accidentally try to drain the same
Task multiple times from concurrent threads
drainedTasks.add(entry.getValue());
diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py
b/tests/kafkatest/tests/streams/streams_eos_test.py
index c02cb9cf0d9..b3ac41b887a 100644
--- a/tests/kafkatest/tests/streams/streams_eos_test.py
+++ b/tests/kafkatest/tests/streams/streams_eos_test.py
@@ -15,7 +15,6 @@
from ducktape.mark import matrix
from ducktape.mark.resource import cluster
-from ducktape.mark import ignore
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.kafka import quorum
from kafkatest.services.streams import StreamsEosTestDriverService,
StreamsEosTestJobRunnerService, \
@@ -39,7 +38,6 @@ class StreamsEosTest(KafkaTest):
self.driver = StreamsEosTestDriverService(test_context, self.kafka)
self.test_context = test_context
- @ignore
@cluster(num_nodes=9)
@matrix(metadata_quorum=[quorum.combined_kraft])
def test_rebalance_simple(self, metadata_quorum):
@@ -47,7 +45,6 @@ class StreamsEosTest(KafkaTest):
StreamsEosTestJobRunnerService(self.test_context,
self.kafka),
StreamsEosTestJobRunnerService(self.test_context,
self.kafka),
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
- @ignore
@cluster(num_nodes=9)
@matrix(metadata_quorum=[quorum.combined_kraft])
def test_rebalance_complex(self, metadata_quorum):
@@ -82,7 +79,6 @@ class StreamsEosTest(KafkaTest):
verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" %
verifier.STDOUT_FILE, allow_fail=False)
- @ignore
@cluster(num_nodes=9)
@matrix(metadata_quorum=[quorum.combined_kraft])
def test_failure_and_recovery(self, metadata_quorum):
@@ -90,7 +86,6 @@ class StreamsEosTest(KafkaTest):
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
- @ignore
@cluster(num_nodes=9)
@matrix(metadata_quorum=[quorum.combined_kraft])
def test_failure_and_recovery_complex(self, metadata_quorum):