Repository: ignite Updated Branches: refs/heads/ignite-zk 7f05c09d3 -> 52174ef77
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b37c35f1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b37c35f1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b37c35f1 Branch: refs/heads/ignite-zk Commit: b37c35f118f7838ab64ac7fb7d926d25b92fcab7 Parents: 7f05c09 Author: sboikov <[email protected]> Authored: Fri Dec 8 14:53:34 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Dec 8 14:53:34 2017 +0300 ---------------------------------------------------------------------- .../continuous/ContinuousRoutinesInfo.java | 71 +++++++++++--------- 1 file changed, 39 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b37c35f1/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java index e46887b..8061b55 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java @@ -32,35 +32,41 @@ import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType */ class ContinuousRoutinesInfo { /** */ - private Map<UUID, ContinuousRoutineInfo> startedRoutines = new HashMap<>(); + private final Map<UUID, ContinuousRoutineInfo> startedRoutines = new HashMap<>(); /** * @param dataBag Discovery data bag. */ void collectGridNodeData(DiscoveryDataBag dataBag) { - if (!dataBag.commonDataCollectedFor(CONTINUOUS_PROC.ordinal())) - dataBag.addGridCommonData(CONTINUOUS_PROC.ordinal(), - new ContinuousRoutinesCommonDiscoveryData(new ArrayList<>(startedRoutines.values()))); + synchronized (startedRoutines) { + if (!dataBag.commonDataCollectedFor(CONTINUOUS_PROC.ordinal())) + dataBag.addGridCommonData(CONTINUOUS_PROC.ordinal(), + new ContinuousRoutinesCommonDiscoveryData(new ArrayList<>(startedRoutines.values()))); + } } /** * @param dataBag Discovery data bag. */ void collectJoiningNodeData(DiscoveryDataBag dataBag) { - for (ContinuousRoutineInfo info : startedRoutines.values()) { - if (info.disconnected) - info.sourceNodeId(dataBag.joiningNodeId()); + synchronized (startedRoutines) { + for (ContinuousRoutineInfo info : startedRoutines.values()) { + if (info.disconnected) + info.sourceNodeId(dataBag.joiningNodeId()); + } + + dataBag.addJoiningNodeData(CONTINUOUS_PROC.ordinal(), + new ContinuousRoutinesJoiningNodeDiscoveryData(new ArrayList<>(startedRoutines.values()))); } - - dataBag.addJoiningNodeData(CONTINUOUS_PROC.ordinal(), - new ContinuousRoutinesJoiningNodeDiscoveryData(new ArrayList<>(startedRoutines.values()))); } /** * @param info Routine info. */ void addRoutineInfo(ContinuousRoutineInfo info) { - startedRoutines.put(info.routineId, info); + synchronized (startedRoutines) { + startedRoutines.put(info.routineId, info); + } } /** @@ -68,29 +74,35 @@ class ContinuousRoutinesInfo { * @return {@code True} if routine exists. */ boolean routineExists(UUID routineId) { - return startedRoutines.containsKey(routineId); + synchronized (startedRoutines) { + return startedRoutines.containsKey(routineId); + } } /** * @param routineId Routine ID. */ void removeRoutine(UUID routineId) { - startedRoutines.remove(routineId); + synchronized (startedRoutines) { + startedRoutines.remove(routineId); + } } /** * @param locRoutines Routines IDs which can survive reconnect. */ void onClientDisconnected(Collection<UUID> locRoutines) { - for (Iterator<Map.Entry<UUID, ContinuousRoutineInfo>> it = startedRoutines.entrySet().iterator(); it.hasNext();) { - Map.Entry<UUID, ContinuousRoutineInfo> e = it.next(); + synchronized (startedRoutines) { + for (Iterator<Map.Entry<UUID, ContinuousRoutineInfo>> it = startedRoutines.entrySet().iterator(); it.hasNext();) { + Map.Entry<UUID, ContinuousRoutineInfo> e = it.next(); - ContinuousRoutineInfo info = e.getValue(); + ContinuousRoutineInfo info = e.getValue(); - if (!locRoutines.contains(info.routineId)) - it.remove(); - else - info.onDisconnected(); + if (!locRoutines.contains(info.routineId)) + it.remove(); + else + info.onDisconnected(); + } } } @@ -100,20 +112,15 @@ class ContinuousRoutinesInfo { * @param nodeId Node ID. */ void onNodeFail(UUID nodeId) { - for (Iterator<Map.Entry<UUID, ContinuousRoutineInfo>> it = startedRoutines.entrySet().iterator(); it.hasNext();) { - Map.Entry<UUID, ContinuousRoutineInfo> e = it.next(); + synchronized (startedRoutines) { + for (Iterator<Map.Entry<UUID, ContinuousRoutineInfo>> it = startedRoutines.entrySet().iterator(); it.hasNext();) { + Map.Entry<UUID, ContinuousRoutineInfo> e = it.next(); - ContinuousRoutineInfo info = e.getValue(); + ContinuousRoutineInfo info = e.getValue(); - if (info.autoUnsubscribe && info.srcNodeId.equals(nodeId)) - it.remove(); + if (info.autoUnsubscribe && info.srcNodeId.equals(nodeId)) + it.remove(); + } } } - - /** - * - */ - void clear() { - startedRoutines.clear(); - } }
