Repository: kafka Updated Branches: refs/heads/trunk d4c4bcf01 -> 97e61d4ae
MINOR: Fix multiple KafkaStreams.StreamStateListener being instantiated There should only be a single `KafkaStreams.StreamStateListener` to ensure synchronization of operations on `KafkaStreams.StreamStateListener#threadState`. Author: Armin Braun <[email protected]> Reviewers: Damian Guy <[email protected]>, Ismael Juma <[email protected]> Closes #2801 from original-brownbear/fix-stream-state-listener Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/97e61d4a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/97e61d4a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/97e61d4a Branch: refs/heads/trunk Commit: 97e61d4ae2feaf0551e75fa8cdd041f49f42a9a5 Parents: d4c4bcf Author: Armin Braun <[email protected]> Authored: Wed Apr 5 11:38:52 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Wed Apr 5 11:39:18 2017 +0100 ---------------------------------------------------------------------- .../java/org/apache/kafka/streams/KafkaStreams.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/97e61d4a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 6ddf2a1..bc2a433 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -122,7 +122,6 @@ public class KafkaStreams { private GlobalStreamThread globalStreamThread; private final StreamThread[] threads; - private final Map<Long, StreamThread.State> threadState; private final Metrics metrics; private final QueryableStoreProvider queryableStoreProvider; @@ -253,6 +252,13 @@ public class KafkaStreams { } private final class StreamStateListener implements StreamThread.StateListener { + + private final Map<Long, StreamThread.State> threadState; + + StreamStateListener(Map<Long, StreamThread.State> threadState) { + this.threadState = threadState; + } + @Override public synchronized void onChange(final StreamThread thread, final StreamThread.State newState, @@ -333,7 +339,7 @@ public class KafkaStreams { metrics = new Metrics(metricConfig, reporters, time); threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)]; - threadState = new HashMap<>(threads.length); + final Map<Long, StreamThread.State> threadState = new HashMap<>(threads.length); final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>(); streamsMetadataState = new StreamsMetadataState(builder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG))); @@ -358,6 +364,7 @@ public class KafkaStreams { globalThreadId); } + final StreamStateListener streamStateListener = new StreamStateListener(threadState); for (int i = 0; i < threads.length; i++) { threads[i] = new StreamThread(builder, config, @@ -369,7 +376,7 @@ public class KafkaStreams { time, streamsMetadataState, cacheSizeBytes); - threads[i].setStateListener(new StreamStateListener()); + threads[i].setStateListener(streamStateListener); threadState.put(threads[i].getId(), threads[i].state()); storeProviders.add(new StreamThreadStateStoreProvider(threads[i])); }
