This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push:
new e4a4e95ebb5 KAFKA-16955: fix synchronization of streams threadState
(#16337)
e4a4e95ebb5 is described below
commit e4a4e95ebb5cdf53121d01dbe421c1071717cd85
Author: Rohan <[email protected]>
AuthorDate: Fri Jun 14 10:44:36 2024 -0700
KAFKA-16955: fix synchronization of streams threadState (#16337)
Each KafkaStreams instance maintains a map from threadId to state
to use to aggregate to a KafkaStreams app state. The map is updated
on every state change, and when a new thread is created. State change
updates are done in a synchronized blocks, however the update that
happens on thread creation is not, which can raise
ConcurrentModificationException. This patch moves this update
into the listener object and protects it using the object's lock.
It also moves ownership of the state map into the listener so that
its less likely that future changes access it without locking
Reviewers: Matthias J. Sax <[email protected]>
---
.../org/apache/kafka/streams/KafkaStreams.java | 64 ++++++++++------------
1 file changed, 30 insertions(+), 34 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 89f8458b206..698bd3d74d7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -176,7 +176,6 @@ public class KafkaStreams implements AutoCloseable {
private final long totalCacheSize;
private final StreamStateListener streamStateListener;
private final DelegatingStateRestoreListener
delegatingStateRestoreListener;
- private final Map<Long, StreamThread.State> threadState;
private final UUID processId;
private final KafkaClientSupplier clientSupplier;
protected final TopologyMetadata topologyMetadata;
@@ -633,17 +632,13 @@ public class KafkaStreams implements AutoCloseable {
/**
* Class that handles stream thread transitions
*/
- final class StreamStateListener implements StreamThread.StateListener {
+ private final class StreamStateListener implements
StreamThread.StateListener {
private final Map<Long, StreamThread.State> threadState;
private GlobalStreamThread.State globalThreadState;
- // this lock should always be held before the state lock
- private final Object threadStatesLock;
- StreamStateListener(final Map<Long, StreamThread.State> threadState,
- final GlobalStreamThread.State globalThreadState) {
- this.threadState = threadState;
+ StreamStateListener(final GlobalStreamThread.State globalThreadState) {
+ this.threadState = new HashMap<>();
this.globalThreadState = globalThreadState;
- this.threadStatesLock = new Object();
}
/**
@@ -675,33 +670,35 @@ public class KafkaStreams implements AutoCloseable {
public synchronized void onChange(final Thread thread,
final ThreadStateTransitionValidator
abstractNewState,
final ThreadStateTransitionValidator
abstractOldState) {
- synchronized (threadStatesLock) {
- // StreamThreads first
- if (thread instanceof StreamThread) {
- final StreamThread.State newState = (StreamThread.State)
abstractNewState;
- threadState.put(thread.getId(), newState);
-
- if (newState == StreamThread.State.PARTITIONS_REVOKED ||
newState == StreamThread.State.PARTITIONS_ASSIGNED) {
- setState(State.REBALANCING);
- } else if (newState == StreamThread.State.RUNNING) {
- maybeSetRunning();
- }
- } else if (thread instanceof GlobalStreamThread) {
- // global stream thread has different invariants
- final GlobalStreamThread.State newState =
(GlobalStreamThread.State) abstractNewState;
- globalThreadState = newState;
-
- if (newState == GlobalStreamThread.State.RUNNING) {
- maybeSetRunning();
- } else if (newState == GlobalStreamThread.State.DEAD) {
- if (state != State.PENDING_SHUTDOWN) {
- log.error("Global thread has died. The streams
application or client will now close to ERROR.");
- closeToError();
- }
+ // StreamThreads first
+ if (thread instanceof StreamThread) {
+ final StreamThread.State newState = (StreamThread.State)
abstractNewState;
+ threadState.put(thread.getId(), newState);
+
+ if (newState == StreamThread.State.PARTITIONS_REVOKED ||
newState == StreamThread.State.PARTITIONS_ASSIGNED) {
+ setState(State.REBALANCING);
+ } else if (newState == StreamThread.State.RUNNING) {
+ maybeSetRunning();
+ }
+ } else if (thread instanceof GlobalStreamThread) {
+ // global stream thread has different invariants
+ final GlobalStreamThread.State newState =
(GlobalStreamThread.State) abstractNewState;
+ globalThreadState = newState;
+
+ if (newState == GlobalStreamThread.State.RUNNING) {
+ maybeSetRunning();
+ } else if (newState == GlobalStreamThread.State.DEAD) {
+ if (state != State.PENDING_SHUTDOWN) {
+ log.error("Global thread has died. The streams
application or client will now close to ERROR.");
+ closeToError();
}
}
}
}
+
+ private synchronized void registerStreamThread(final StreamThread
streamThread) {
+ threadState.put(streamThread.getId(), streamThread.state());
+ }
}
static final class DelegatingStateRestoreListener implements
StateRestoreListener {
@@ -1047,8 +1044,7 @@ public class KafkaStreams implements AutoCloseable {
globalThreadState = globalStreamThread.state();
}
- threadState = new HashMap<>(numStreamThreads);
- streamStateListener = new StreamStateListener(threadState,
globalThreadState);
+ streamStateListener = new StreamStateListener(globalThreadState);
final GlobalStateStoreProvider globalStateStoreProvider = new
GlobalStateStoreProvider(this.topologyMetadata.globalStateStores());
@@ -1084,9 +1080,9 @@ public class KafkaStreams implements AutoCloseable {
KafkaStreams.this::closeToError,
streamsUncaughtExceptionHandler
);
+ streamStateListener.registerStreamThread(streamThread);
streamThread.setStateListener(streamStateListener);
threads.add(streamThread);
- threadState.put(streamThread.getId(), streamThread.state());
queryableStoreProvider.addStoreProviderForThread(streamThread.getName(), new
StreamThreadStateStoreProvider(streamThread));
return streamThread;
}