This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new e96a463561c KAFKA-14862 (HOTFIX): Fix ConcurrentModificationException (#13734) e96a463561c is described below commit e96a463561ca8974fca37562b8675ae8ae4aff29 Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Sun May 21 14:39:12 2023 -0700 KAFKA-14862 (HOTFIX): Fix ConcurrentModificationException (#13734) Reviewers: Walker Carlson <<wcarl...@confluent.io> --- .../org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java index f4ad9ac682f..8e0fcfece0e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java @@ -44,11 +44,11 @@ import org.apache.kafka.streams.state.internals.LeftOrRightValueSerde; import java.time.Duration; import java.util.Arrays; -import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration; @@ -60,7 +60,7 @@ class KStreamImplJoin { private final boolean rightOuter; static class TimeTrackerSupplier { - private final Map<TaskId, TimeTracker> tracker = new HashMap<>(); + private final Map<TaskId, TimeTracker> tracker = new ConcurrentHashMap<>(); public TimeTracker get(final TaskId taskId) { return tracker.computeIfAbsent(taskId, taskId1 -> new TimeTracker());