ignite-4779 Missed discovery data snapshot during exchange processing (do not use discovery manager cache to handle exchange)
(cherry picked from commit a61a98ad3908770b77d0ffb071effbc92f4d5c5a) Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/42346673 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/42346673 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/42346673 Branch: refs/heads/ignite-gg-8.0.3.ea6-clients-test Commit: 423466736904b55a6d4397aa6268fff5a9ee5ea9 Parents: 7e956ed Author: Igor Seliverstov <[email protected]> Authored: Thu Mar 9 21:01:41 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Mar 9 22:14:18 2017 +0300 ---------------------------------------------------------------------- .../internal/managers/discovery/DiscoCache.java | 310 +++++++++ .../discovery/GridDiscoveryManager.java | 680 +++++-------------- .../eventstorage/DiscoveryEventListener.java | 33 + .../eventstorage/GridEventStorageManager.java | 162 ++++- .../affinity/GridAffinityAssignmentCache.java | 7 +- .../cache/CacheAffinitySharedManager.java | 35 +- .../cache/GridCacheAffinityManager.java | 3 +- .../GridCachePartitionExchangeManager.java | 73 +- .../dht/GridClientPartitionTopology.java | 20 +- .../dht/GridDhtAssignmentFetchFuture.java | 7 +- .../dht/GridDhtPartitionTopologyImpl.java | 40 +- .../GridDhtPartitionsExchangeFuture.java | 30 +- .../service/GridServiceProcessor.java | 21 +- .../GridDiscoveryManagerAliveCacheSelfTest.java | 14 - 14 files changed, 808 insertions(+), 627 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/42346673/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java new file mode 100644 index 0000000..5247ac1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class DiscoCache { + /** Local node. */ + private final ClusterNode loc; + + /** Remote nodes. */ + private final List<ClusterNode> rmtNodes; + + /** All nodes. */ + private final List<ClusterNode> allNodes; + + /** All server nodes. */ + private final List<ClusterNode> srvNodes; + + /** Daemon nodes. */ + private final List<ClusterNode> daemonNodes; + + /** All server nodes. */ + private final List<ClusterNode> srvNodesWithCaches; + + /** All nodes with at least one cache configured. */ + @GridToStringInclude + private final List<ClusterNode> allNodesWithCaches; + + /** All remote nodes with at least one cache configured. */ + @GridToStringInclude + private final List<ClusterNode> rmtNodesWithCaches; + + /** Cache nodes by cache name. */ + @GridToStringInclude + private final Map<Integer, List<ClusterNode>> allCacheNodes; + + /** Affinity cache nodes by cache name. */ + @GridToStringInclude + private final Map<Integer, List<ClusterNode>> affCacheNodes; + + /** Node map. */ + private final Map<UUID, ClusterNode> nodeMap; + + /** Caches where at least one node has near cache enabled. */ + @GridToStringInclude + private final Set<Integer> nearEnabledCaches; + + /** Alive nodes. */ + private final Set<UUID> alives = new GridConcurrentHashSet<>(); + + /** + * @param loc Local node. + * @param rmtNodes Remote nodes. + * @param allNodes All nodes. + * @param srvNodes Server nodes. + * @param daemonNodes Daemon nodes. + * @param srvNodesWithCaches Server nodes with at least one cache configured. + * @param allNodesWithCaches All nodes with at least one cache configured. + * @param rmtNodesWithCaches Remote nodes with at least one cache configured. + * @param allCacheNodes Cache nodes by cache name. + * @param affCacheNodes Affinity cache nodes by cache name. + * @param nodeMap Node map. + * @param nearEnabledCaches Caches where at least one node has near cache enabled. + * @param alives Alive nodes. + */ + DiscoCache(ClusterNode loc, + List<ClusterNode> rmtNodes, + List<ClusterNode> allNodes, + List<ClusterNode> srvNodes, + List<ClusterNode> daemonNodes, + List<ClusterNode> srvNodesWithCaches, + List<ClusterNode> allNodesWithCaches, + List<ClusterNode> rmtNodesWithCaches, + Map<Integer, List<ClusterNode>> allCacheNodes, + Map<Integer, List<ClusterNode>> affCacheNodes, + Map<UUID, ClusterNode> nodeMap, + Set<Integer> nearEnabledCaches, + Set<UUID> alives) { + this.loc = loc; + this.rmtNodes = rmtNodes; + this.allNodes = allNodes; + this.srvNodes = srvNodes; + this.daemonNodes = daemonNodes; + this.srvNodesWithCaches = srvNodesWithCaches; + this.allNodesWithCaches = allNodesWithCaches; + this.rmtNodesWithCaches = rmtNodesWithCaches; + this.allCacheNodes = allCacheNodes; + this.affCacheNodes = affCacheNodes; + this.nodeMap = nodeMap; + this.nearEnabledCaches = nearEnabledCaches; + this.alives.addAll(alives); + } + + /** @return Local node. */ + public ClusterNode localNode() { + return loc; + } + + /** @return Remote nodes. */ + public List<ClusterNode> remoteNodes() { + return rmtNodes; + } + + /** @return All nodes. */ + public List<ClusterNode> allNodes() { + return allNodes; + } + + /** @return Server nodes. */ + public List<ClusterNode> serverNodes() { + return srvNodes; + } + + /** @return Daemon nodes. */ + public List<ClusterNode> daemonNodes() { + return daemonNodes; + } + + /** @return Server nodes with at least one cache configured. */ + public List<ClusterNode> serverNodesWithCaches() { + return srvNodesWithCaches; + } + + /** + * Gets all remote nodes that have at least one cache configured. + * + * @return Collection of nodes. + */ + public List<ClusterNode> remoteNodesWithCaches() { + return rmtNodesWithCaches; + } + + /** + * Gets collection of nodes with at least one cache configured. + * + * @return Collection of nodes. + */ + public List<ClusterNode> allNodesWithCaches() { + return allNodesWithCaches; + } + + /** + * Gets collection of server nodes with at least one cache configured. + * + * @return Collection of nodes. + */ + public Collection<ClusterNode> aliveServerNodes() { + return F.view(serverNodes(), new P1<ClusterNode>() { + @Override public boolean apply(ClusterNode node) { + return alives.contains(node.id()); + } + }); + } + + /** + * Gets collection of server nodes with at least one cache configured. + * + * @return Collection of nodes. + */ + public Collection<ClusterNode> aliveServerNodesWithCaches() { + return F.view(serverNodesWithCaches(), new P1<ClusterNode>() { + @Override public boolean apply(ClusterNode node) { + return alives.contains(node.id()); + } + }); + } + + /** + * @return Oldest alive server node. + */ + public @Nullable ClusterNode oldestAliveServerNode(){ + Iterator<ClusterNode> it = aliveServerNodes().iterator(); + return it.hasNext() ? it.next() : null; + } + + /** + * @return Oldest alive server node with at least one cache configured. + */ + public @Nullable ClusterNode oldestAliveServerNodeWithCache(){ + Iterator<ClusterNode> it = aliveServerNodesWithCaches().iterator(); + return it.hasNext() ? it.next() : null; + } + + /** + * Gets all nodes that have cache with given name. + * + * @param cacheName Cache name. + * @return Collection of nodes. + */ + public List<ClusterNode> cacheNodes(@Nullable String cacheName) { + return cacheNodes(CU.cacheId(cacheName)); + } + + /** + * Gets all nodes that have cache with given ID. + * + * @param cacheId Cache ID. + * @return Collection of nodes. + */ + public List<ClusterNode> cacheNodes(Integer cacheId) { + return emptyIfNull(allCacheNodes.get(cacheId)); + } + + /** + * Gets all nodes that have cache with given ID and should participate in affinity calculation. With + * partitioned cache nodes with near-only cache do not participate in affinity node calculation. + * + * @param cacheName Cache name. + * @return Collection of nodes. + */ + public List<ClusterNode> cacheAffinityNodes(@Nullable String cacheName) { + return cacheAffinityNodes(CU.cacheId(cacheName)); + } + + /** + * Gets all nodes that have cache with given ID and should participate in affinity calculation. With + * partitioned cache nodes with near-only cache do not participate in affinity node calculation. + * + * @param cacheId Cache ID. + * @return Collection of nodes. + */ + public List<ClusterNode> cacheAffinityNodes(int cacheId) { + return emptyIfNull(affCacheNodes.get(cacheId)); + } + + /** + * Checks if cache with given ID has at least one node with near cache enabled. + * + * @param cacheId Cache ID. + * @return {@code True} if cache with given name has at least one node with near cache enabled. + */ + public boolean hasNearCache(int cacheId) { + return nearEnabledCaches.contains(cacheId); + } + + /** + * @param id Node ID. + * @return Node. + */ + public @Nullable ClusterNode node(UUID id) { + return nodeMap.get(id); + } + + /** + * Removes left node from alives lists. + * + * @param rmvd Removed node. + */ + public void updateAlives(ClusterNode rmvd) { + alives.remove(rmvd.id()); + } + + /** + * Removes left nodes from cached alives lists. + * + * @param discovery Discovery manager. + */ + public void updateAlives(GridDiscoveryManager discovery) { + for (UUID alive : alives) { + if (!discovery.alive(alive)) + alives.remove(alive); + } + } + + /** + * @param nodes Cluster nodes. + * @return Empty collection if nodes list is {@code null} + */ + private List<ClusterNode> emptyIfNull(List<ClusterNode> nodes) { + return nodes == null ? Collections.<ClusterNode>emptyList() : nodes; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DiscoCache.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/42346673/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 218aff0..960a064 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -35,14 +35,12 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.NavigableMap; import java.util.Set; -import java.util.TreeMap; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; @@ -79,19 +77,17 @@ import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.processors.service.GridServiceProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; -import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.lang.GridTuple5; +import org.apache.ignite.internal.util.lang.GridTuple6; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; @@ -100,7 +96,6 @@ import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.security.SecurityCredentials; import org.apache.ignite.plugin.segmentation.SegmentationPolicy; @@ -114,6 +109,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport; import org.apache.ignite.thread.IgniteThread; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -235,7 +231,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private long segChkFreq; /** Local node join to topology event. */ - private GridFutureAdapter<DiscoveryEvent> locJoinEvt = new GridFutureAdapter<>(); + private GridFutureAdapter<T2<DiscoveryEvent, DiscoCache>> locJoin = new GridFutureAdapter<>(); /** GC CPU load. */ private volatile double gcCpuLoad; @@ -560,20 +556,25 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } + final DiscoCache discoCache; + // Put topology snapshot into discovery history. // There is no race possible between history maintenance and concurrent discovery // event notifications, since SPI notifies manager about all events from this listener. if (verChanged) { - DiscoCache cache = new DiscoCache(locNode, F.view(topSnapshot, F.remoteNodes(locNode.id()))); + discoCache = createDiscoCache(locNode, topSnapshot); - discoCacheHist.put(nextTopVer, cache); + discoCacheHist.put(nextTopVer, discoCache); - boolean set = updateTopologyVersionIfGreater(nextTopVer, cache); + boolean set = updateTopologyVersionIfGreater(nextTopVer, discoCache); assert set || topVer == 0 : "Topology version has not been updated [this.topVer=" + topSnap + ", topVer=" + topVer + ", node=" + node + ", evt=" + U.gridEventName(type) + ']'; } + else + // Current version. + discoCache = discoCache(); // If this is a local join event, just save it and do not notify listeners. if (type == EVT_NODE_JOINED && node.id().equals(locNode.id())) { @@ -581,7 +582,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { gridStartTime = getSpi().getGridStartTime(); updateTopologyVersionIfGreater(new AffinityTopologyVersion(locNode.order()), - new DiscoCache(localNode(), F.view(topSnapshot, F.remoteNodes(locNode.id())))); + discoCache); startLatch.countDown(); @@ -591,14 +592,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { discoEvt.eventNode(node); discoEvt.type(EVT_NODE_JOINED); - discoEvt.topologySnapshot(topVer, new ArrayList<>( - F.viewReadOnly(topSnapshot, new C1<ClusterNode, ClusterNode>() { - @Override public ClusterNode apply(ClusterNode e) { - return e; - } - }, FILTER_DAEMON))); + discoEvt.topologySnapshot(topVer, new ArrayList<>(F.view(topSnapshot, FILTER_DAEMON))); - locJoinEvt.onDone(discoEvt); + locJoin.onDone(new T2<>(discoEvt, discoCache)); return; } @@ -613,7 +609,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ((IgniteKernal)ctx.grid()).onDisconnected(); - locJoinEvt = new GridFutureAdapter<>(); + locJoin = new GridFutureAdapter<>(); registeredCaches.clear(); @@ -626,7 +622,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { topHist.clear(); topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, - new DiscoCache(locNode, Collections.<ClusterNode>emptySet()))); + createDiscoCache(locNode, Collections.<ClusterNode>emptySet()))); } else if (type == EVT_CLIENT_NODE_RECONNECTED) { assert locNode.isClient() : locNode; @@ -643,7 +639,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { try { fut.get(); - discoWrk.addEvent(type, nextTopVer, node, topSnapshot, null); + discoWrk.addEvent(type, nextTopVer, node, discoCache, topSnapshot, null); } catch (IgniteException ignore) { // No-op. @@ -655,7 +651,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } if (type == EVT_CLIENT_NODE_DISCONNECTED || type == EVT_NODE_SEGMENTED || !ctx.clientDisconnected()) - discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg); + discoWrk.addEvent(type, nextTopVer, node, discoCache, topSnapshot, customMsg); } }); @@ -1372,8 +1368,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { U.join(segChkThread, log); } - if (!locJoinEvt.isDone()) - locJoinEvt.onDone( + if (!locJoin.isDone()) + locJoin.onDone( new IgniteCheckedException("Failed to wait for local node joined event (grid is stopping).")); } @@ -1567,7 +1563,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * * @return Discovery collection cache. */ - private DiscoCache discoCache() { + public DiscoCache discoCache() { Snapshot cur = topSnap.get(); assert cur != null; @@ -1620,14 +1616,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * @param topVer Topology version. - * @return All server nodes for given topology version. - */ - public List<ClusterNode> serverNodes(AffinityTopologyVersion topVer) { - return resolveDiscoCache(null, topVer).srvNodes; - } - - /** * Gets node from history for given topology version. * * @param topVer Topology version. @@ -1646,7 +1634,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return Collection of cache nodes. */ public Collection<ClusterNode> cacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) { - return resolveDiscoCache(cacheName, topVer).cacheNodes(cacheName, topVer.topologyVersion()); + return resolveDiscoCache(cacheName, topVer).cacheNodes(cacheName); } /** @@ -1656,7 +1644,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return Collection of cache nodes. */ public Collection<ClusterNode> cacheNodes(AffinityTopologyVersion topVer) { - return resolveDiscoCache(null, topVer).allNodesWithCaches(topVer.topologyVersion()); + return resolveDiscoCache(null, topVer).allNodesWithCaches(); } /** @@ -1666,29 +1654,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return Collection of cache nodes. */ public Collection<ClusterNode> remoteCacheNodes(AffinityTopologyVersion topVer) { - return resolveDiscoCache(null, topVer).remoteCacheNodes(topVer.topologyVersion()); - } - - /** - * Gets cache nodes for cache with given name. - * - * @param cacheName Cache name. - * @param topVer Topology version. - * @return Collection of cache nodes. - */ - Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) { - return resolveDiscoCache(cacheName, topVer).aliveCacheNodes(cacheName, topVer.topologyVersion()); - } - - /** - * Gets cache remote nodes for cache with given name. - * - * @param cacheName Cache name. - * @param topVer Topology version. - * @return Collection of cache nodes. - */ - Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) { - return resolveDiscoCache(cacheName, topVer).aliveRemoteCacheNodes(cacheName, topVer.topologyVersion()); + return resolveDiscoCache(null, topVer).remoteNodesWithCaches(); } /** @@ -1696,11 +1662,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return Oldest alive server nodes with at least one cache configured. */ @Nullable public ClusterNode oldestAliveCacheServerNode(AffinityTopologyVersion topVer) { - DiscoCache cache = resolveDiscoCache(null, topVer); - - Map.Entry<ClusterNode, Boolean> e = cache.aliveSrvNodesWithCaches.firstEntry(); - - return e != null ? e.getKey() : null; + return resolveDiscoCache(null, topVer).oldestAliveServerNodeWithCache(); } /** @@ -1711,7 +1673,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return Collection of cache affinity nodes. */ public Collection<ClusterNode> cacheAffinityNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) { - return resolveDiscoCache(cacheName, topVer).cacheAffinityNodes(cacheName, topVer.topologyVersion()); + return resolveDiscoCache(cacheName, topVer).cacheAffinityNodes(cacheName); } /** @@ -1788,7 +1750,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if cache with given name has at least one node with near cache enabled. */ public boolean hasNearCache(@Nullable String cacheName, AffinityTopologyVersion topVer) { - return resolveDiscoCache(cacheName, topVer).hasNearCache(cacheName); + return resolveDiscoCache(cacheName, topVer).hasNearCache(CU.cacheId(cacheName)); } /** @@ -1874,7 +1836,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** @return Event that represents a local node joined to topology. */ public DiscoveryEvent localJoinEvent() { try { - return locJoinEvt.get(); + return locJoin.get().get1(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** + * @return Tuple that consists of a local join event and discovery cache at the join time. + */ + public T2<DiscoveryEvent, DiscoCache> localJoin() { + try { + return locJoin.get(); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -1948,6 +1922,114 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * @param loc Local node. + * @param topSnapshot Topology snapshot. + * @return Newly created discovery cache. + */ + @NotNull private DiscoCache createDiscoCache(ClusterNode loc, Collection<ClusterNode> topSnapshot) { + HashSet<UUID> alives = U.newHashSet(topSnapshot.size()); + HashMap<UUID, ClusterNode> nodeMap = U.newHashMap(topSnapshot.size()); + + ArrayList<ClusterNode> daemonNodes = new ArrayList<>(topSnapshot.size()); + ArrayList<ClusterNode> srvNodes = new ArrayList<>(topSnapshot.size()); + ArrayList<ClusterNode> rmtNodes = new ArrayList<>(topSnapshot.size()); + ArrayList<ClusterNode> allNodes = new ArrayList<>(topSnapshot.size()); + + for (ClusterNode node : topSnapshot) { + if (alive(node)) + alives.add(node.id()); + + if (node.isDaemon()) + daemonNodes.add(node); + else { + allNodes.add(node); + + if (!node.isLocal()) + rmtNodes.add(node); + + if (!CU.clientNode(node)) + srvNodes.add(node); + } + + nodeMap.put(node.id(), node); + } + + assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" + + " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']'; + + Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size()); + Map<Integer, List<ClusterNode>> affCacheNodes = U.newHashMap(allNodes.size()); + + Set<ClusterNode> allNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); + Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); + Set<ClusterNode> srvNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); + + Set<Integer> nearEnabledCaches = new HashSet<>(); + + for (ClusterNode node : allNodes) { + assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']'; + assert !node.isDaemon(); + + for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) { + String cacheName = entry.getKey(); + CachePredicate filter = entry.getValue(); + + if (filter.cacheNode(node)) { + allNodesWithCaches.add(node); + + if(!CU.clientNode(node)) + srvNodesWithCaches.add(node); + + if (!node.isLocal()) + rmtNodesWithCaches.add(node); + + addToMap(allCacheNodes, cacheName, node); + + if (filter.dataNode(node)) + addToMap(affCacheNodes, cacheName, node); + + if (filter.nearNode(node)) + nearEnabledCaches.add(CU.cacheId(cacheName)); + } + } + } + + return new DiscoCache( + loc, + Collections.unmodifiableList(rmtNodes), + Collections.unmodifiableList(allNodes), + Collections.unmodifiableList(srvNodes), + Collections.unmodifiableList(daemonNodes), + U.sealList(srvNodesWithCaches), + U.sealList(allNodesWithCaches), + U.sealList(rmtNodesWithCaches), + Collections.unmodifiableMap(allCacheNodes), + Collections.unmodifiableMap(affCacheNodes), + Collections.unmodifiableMap(nodeMap), + Collections.unmodifiableSet(nearEnabledCaches), + alives); + } + + /** + * Adds node to map. + * + * @param cacheMap Map to add to. + * @param cacheName Cache name. + * @param rich Node to add + */ + private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) { + List<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName)); + + if (cacheNodes == null) { + cacheNodes = new ArrayList<>(); + + cacheMap.put(CU.cacheId(cacheName), cacheNodes); + } + + cacheNodes.add(rich); + } + + /** * Updates topology version if current version is smaller than updated. * * @param updated Updated topology version. @@ -2048,8 +2130,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { lastChk = now; if (!segValid) { - discoWrk.addEvent(EVT_NODE_SEGMENTED, AffinityTopologyVersion.NONE, getSpi().getLocalNode(), - Collections.<ClusterNode>emptyList(), null); + List<ClusterNode> empty = Collections.emptyList(); + + ClusterNode node = getSpi().getLocalNode(); + + discoWrk.addEvent(EVT_NODE_SEGMENTED, + AffinityTopologyVersion.NONE, + node, + createDiscoCache(node, empty), + empty, + null); lastSegChkRes.set(false); } @@ -2069,8 +2159,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** Worker for discovery events. */ private class DiscoveryWorker extends GridWorker { /** Event queue. */ - private final BlockingQueue<GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, - DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>(); + private final BlockingQueue<GridTuple6<Integer, AffinityTopologyVersion, ClusterNode, + DiscoCache, Collection<ClusterNode>, DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>(); /** Node segmented event fired flag. */ private boolean nodeSegFired; @@ -2088,10 +2178,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param type Discovery event type. See {@link DiscoveryEvent} for more details. * @param topVer Topology version. * @param node Remote node this event is connected with. + * @param discoCache Discovery cache. * @param topSnapshot Topology snapshot. */ @SuppressWarnings("RedundantTypeArguments") - private void recordEvent(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot) { + private void recordEvent(int type, long topVer, ClusterNode node, DiscoCache discoCache, Collection<ClusterNode> topSnapshot) { assert node != null; if (ctx.event().isRecordable(type)) { @@ -2100,7 +2191,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { evt.node(ctx.discovery().localNode()); evt.eventNode(node); evt.type(type); - evt.topologySnapshot(topVer, U.<ClusterNode, ClusterNode>arrayList(topSnapshot, FILTER_DAEMON)); if (type == EVT_NODE_METRICS_UPDATED) @@ -2127,7 +2217,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { else assert false; - ctx.event().record(evt); + ctx.event().record(evt, discoCache); } } @@ -2135,6 +2225,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param type Event type. * @param topVer Topology version. * @param node Node. + * @param discoCache Discovery cache. * @param topSnapshot Topology snapshot. * @param data Custom message. */ @@ -2142,12 +2233,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { int type, AffinityTopologyVersion topVer, ClusterNode node, + DiscoCache discoCache, Collection<ClusterNode> topSnapshot, @Nullable DiscoveryCustomMessage data ) { assert node != null : data; - evts.add(new GridTuple5<>(type, topVer, node, topSnapshot, data)); + evts.add(new GridTuple6<>(type, topVer, node, discoCache, topSnapshot, data)); } /** @@ -2184,7 +2276,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** @throws InterruptedException If interrupted. */ @SuppressWarnings("DuplicateCondition") private void body0() throws InterruptedException { - GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, + GridTuple6<Integer, AffinityTopologyVersion, ClusterNode, DiscoCache, Collection<ClusterNode>, DiscoveryCustomMessage> evt = evts.take(); int type = evt.get1(); @@ -2317,11 +2409,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { customEvt.node(ctx.discovery().localNode()); customEvt.eventNode(node); customEvt.type(type); - customEvt.topologySnapshot(topVer.topologyVersion(), evt.get4()); + customEvt.topologySnapshot(topVer.topologyVersion(), evt.get5()); customEvt.affinityTopologyVersion(topVer); - customEvt.customMessage(evt.get5()); + customEvt.customMessage(evt.get6()); - ctx.event().record(customEvt); + ctx.event().record(customEvt, evt.get4()); } return; @@ -2335,7 +2427,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { assert false : "Invalid discovery event: " + type; } - recordEvent(type, topVer.topologyVersion(), node, evt.get4()); + recordEvent(type, topVer.topologyVersion(), node, evt.get4(), evt.get5()); if (segmented) onSegmentation(); @@ -2545,432 +2637,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } - /** Cache for discovery collections. */ - private class DiscoCache { - /** Remote nodes. */ - private final List<ClusterNode> rmtNodes; - - /** All nodes. */ - private final List<ClusterNode> allNodes; - - /** All server nodes. */ - private final List<ClusterNode> srvNodes; - - /** All nodes with at least one cache configured. */ - @GridToStringInclude - private final Collection<ClusterNode> allNodesWithCaches; - - /** All nodes with at least one cache configured. */ - @GridToStringInclude - private final Collection<ClusterNode> rmtNodesWithCaches; - - /** Cache nodes by cache name. */ - @GridToStringInclude - private final Map<String, Collection<ClusterNode>> allCacheNodes; - - /** Remote cache nodes by cache name. */ - @GridToStringInclude - private final Map<String, Collection<ClusterNode>> rmtCacheNodes; - - /** Cache nodes by cache name. */ - @GridToStringInclude - private final Map<String, Collection<ClusterNode>> affCacheNodes; - - /** Caches where at least one node has near cache enabled. */ - @GridToStringInclude - private final Set<String> nearEnabledCaches; - - /** Nodes grouped by version. */ - private final NavigableMap<IgniteProductVersion, Collection<ClusterNode>> nodesByVer; - - /** Daemon nodes. */ - private final List<ClusterNode> daemonNodes; - - /** Node map. */ - private final Map<UUID, ClusterNode> nodeMap; - - /** Local node. */ - private final ClusterNode loc; - - /** Highest node order. */ - private final long maxOrder; - - /** - * Cached alive nodes list. As long as this collection doesn't accept {@code null}s use {@link - * #maskNull(String)} before passing raw cache names to it. - */ - private final ConcurrentMap<String, Collection<ClusterNode>> aliveCacheNodes; - - /** - * Cached alive remote nodes list. As long as this collection doesn't accept {@code null}s use {@link - * #maskNull(String)} before passing raw cache names to it. - */ - private final ConcurrentMap<String, Collection<ClusterNode>> aliveRmtCacheNodes; - - /** - * Cached alive server remote nodes with caches. - */ - private final ConcurrentSkipListMap<ClusterNode, Boolean> aliveSrvNodesWithCaches; - - /** - * @param loc Local node. - * @param rmts Remote nodes. - */ - private DiscoCache(ClusterNode loc, Collection<ClusterNode> rmts) { - this.loc = loc; - - rmtNodes = Collections.unmodifiableList(new ArrayList<>(F.view(rmts, FILTER_DAEMON))); - - assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" + - " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']'; - - List<ClusterNode> all = new ArrayList<>(rmtNodes.size() + 1); - - if (!loc.isDaemon()) - all.add(loc); - - all.addAll(rmtNodes); - - Collections.sort(all, GridNodeOrderComparator.INSTANCE); - - allNodes = Collections.unmodifiableList(all); - - Map<String, Collection<ClusterNode>> cacheMap = new HashMap<>(allNodes.size(), 1.0f); - Map<String, Collection<ClusterNode>> rmtCacheMap = new HashMap<>(allNodes.size(), 1.0f); - Map<String, Collection<ClusterNode>> dhtNodesMap = new HashMap<>(allNodes.size(), 1.0f); - Collection<ClusterNode> nodesWithCaches = new HashSet<>(allNodes.size()); - Collection<ClusterNode> rmtNodesWithCaches = new HashSet<>(allNodes.size()); - - aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f); - aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f); - aliveSrvNodesWithCaches = new ConcurrentSkipListMap<>(GridNodeOrderComparator.INSTANCE); - nodesByVer = new TreeMap<>(); - - long maxOrder0 = 0; - - Set<String> nearEnabledSet = new HashSet<>(); - - List<ClusterNode> srvNodes = new ArrayList<>(); - - for (ClusterNode node : allNodes) { - assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']'; - assert !node.isDaemon(); - - if (!CU.clientNode(node)) - srvNodes.add(node); - - if (node.order() > maxOrder0) - maxOrder0 = node.order(); - - boolean hasCaches = false; - - for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) { - String cacheName = entry.getKey(); - - CachePredicate filter = entry.getValue(); - - if (filter.cacheNode(node)) { - nodesWithCaches.add(node); - - if (!loc.id().equals(node.id())) - rmtNodesWithCaches.add(node); - - addToMap(cacheMap, cacheName, node); - - if (alive(node.id())) - addToMap(aliveCacheNodes, maskNull(cacheName), node); - - if (filter.dataNode(node)) - addToMap(dhtNodesMap, cacheName, node); - - if (filter.nearNode(node)) - nearEnabledSet.add(cacheName); - - if (!loc.id().equals(node.id())) { - addToMap(rmtCacheMap, cacheName, node); - - if (alive(node.id())) - addToMap(aliveRmtCacheNodes, maskNull(cacheName), node); - } - - hasCaches = true; - } - } - - if (hasCaches && alive(node.id()) && !CU.clientNode(node)) - aliveSrvNodesWithCaches.put(node, Boolean.TRUE); - - IgniteProductVersion nodeVer = U.productVersion(node); - - // Create collection for this version if it does not exist. - Collection<ClusterNode> nodes = nodesByVer.get(nodeVer); - - if (nodes == null) { - nodes = new ArrayList<>(allNodes.size()); - - nodesByVer.put(nodeVer, nodes); - } - - nodes.add(node); - } - - Collections.sort(srvNodes, CU.nodeComparator(true)); - - // Need second iteration to add this node to all previous node versions. - for (ClusterNode node : allNodes) { - IgniteProductVersion nodeVer = U.productVersion(node); - - // Get all versions lower or equal node's version. - NavigableMap<IgniteProductVersion, Collection<ClusterNode>> updateView = - nodesByVer.headMap(nodeVer, false); - - for (Collection<ClusterNode> prevVersions : updateView.values()) - prevVersions.add(node); - } - - maxOrder = maxOrder0; - - allCacheNodes = Collections.unmodifiableMap(cacheMap); - rmtCacheNodes = Collections.unmodifiableMap(rmtCacheMap); - affCacheNodes = Collections.unmodifiableMap(dhtNodesMap); - allNodesWithCaches = Collections.unmodifiableCollection(nodesWithCaches); - this.rmtNodesWithCaches = Collections.unmodifiableCollection(rmtNodesWithCaches); - nearEnabledCaches = Collections.unmodifiableSet(nearEnabledSet); - this.srvNodes = Collections.unmodifiableList(srvNodes); - - daemonNodes = Collections.unmodifiableList(new ArrayList<>( - F.view(F.concat(false, loc, rmts), F0.not(FILTER_DAEMON)))); - - Map<UUID, ClusterNode> nodeMap = new HashMap<>(allNodes().size() + daemonNodes.size(), 1.0f); - - for (ClusterNode n : F.concat(false, allNodes(), daemonNodes())) - nodeMap.put(n.id(), n); - - this.nodeMap = nodeMap; - } - - /** - * Adds node to map. - * - * @param cacheMap Map to add to. - * @param cacheName Cache name. - * @param rich Node to add - */ - private void addToMap(Map<String, Collection<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) { - Collection<ClusterNode> cacheNodes = cacheMap.get(cacheName); - - if (cacheNodes == null) { - cacheNodes = new ArrayList<>(allNodes.size()); - - cacheMap.put(cacheName, cacheNodes); - } - - cacheNodes.add(rich); - } - - /** @return Local node. */ - ClusterNode localNode() { - return loc; - } - - /** @return Remote nodes. */ - Collection<ClusterNode> remoteNodes() { - return rmtNodes; - } - - /** @return All nodes. */ - Collection<ClusterNode> allNodes() { - return allNodes; - } - - /** - * Gets collection of nodes which have version equal or greater than {@code ver}. - * - * @param ver Version to check. - * @return Collection of nodes with version equal or greater than {@code ver}. - */ - Collection<ClusterNode> elderNodes(IgniteProductVersion ver) { - Map.Entry<IgniteProductVersion, Collection<ClusterNode>> entry = nodesByVer.ceilingEntry(ver); - - if (entry == null) - return Collections.emptyList(); - - return entry.getValue(); - } - - /** - * @return Versions map. - */ - NavigableMap<IgniteProductVersion, Collection<ClusterNode>> versionsMap() { - return nodesByVer; - } - - /** - * Gets collection of nodes with at least one cache configured. - * - * @param topVer Topology version (maximum allowed node order). - * @return Collection of nodes. - */ - Collection<ClusterNode> allNodesWithCaches(final long topVer) { - return filter(topVer, allNodesWithCaches); - } - - /** - * Gets all nodes that have cache with given name. - * - * @param cacheName Cache name. - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> cacheNodes(@Nullable String cacheName, final long topVer) { - return filter(topVer, allCacheNodes.get(cacheName)); - } - - /** - * Gets all remote nodes that have at least one cache configured. - * - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> remoteCacheNodes(final long topVer) { - return filter(topVer, rmtNodesWithCaches); - } - - /** - * Gets all nodes that have cache with given name and should participate in affinity calculation. With - * partitioned cache nodes with near-only cache do not participate in affinity node calculation. - * - * @param cacheName Cache name. - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> cacheAffinityNodes(@Nullable String cacheName, final long topVer) { - return filter(topVer, affCacheNodes.get(cacheName)); - } - - /** - * Gets all alive nodes that have cache with given name. - * - * @param cacheName Cache name. - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, final long topVer) { - return filter(topVer, aliveCacheNodes.get(maskNull(cacheName))); - } - - /** - * Gets all alive remote nodes that have cache with given name. - * - * @param cacheName Cache name. - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, final long topVer) { - return filter(topVer, aliveRmtCacheNodes.get(maskNull(cacheName))); - } - - /** - * Checks if cache with given name has at least one node with near cache enabled. - * - * @param cacheName Cache name. - * @return {@code True} if cache with given name has at least one node with near cache enabled. - */ - boolean hasNearCache(@Nullable String cacheName) { - return nearEnabledCaches.contains(cacheName); - } - - /** - * Removes left node from cached alives lists. - * - * @param leftNode Left node. - */ - void updateAlives(ClusterNode leftNode) { - if (leftNode.order() > maxOrder) - return; - - filterNodeMap(aliveCacheNodes, leftNode); - - filterNodeMap(aliveRmtCacheNodes, leftNode); - - aliveSrvNodesWithCaches.remove(leftNode); - } - - /** - * Creates a copy of nodes map without the given node. - * - * @param map Map to copy. - * @param exclNode Node to exclude. - */ - private void filterNodeMap(ConcurrentMap<String, Collection<ClusterNode>> map, final ClusterNode exclNode) { - for (String cacheName : registeredCaches.keySet()) { - String maskedName = maskNull(cacheName); - - while (true) { - Collection<ClusterNode> oldNodes = map.get(maskedName); - - if (oldNodes == null || oldNodes.isEmpty()) - break; - - Collection<ClusterNode> newNodes = new ArrayList<>(oldNodes); - - if (!newNodes.remove(exclNode)) - break; - - if (map.replace(maskedName, oldNodes, newNodes)) - break; - } - } - } - - /** - * Replaces {@code null} with {@code NULL_CACHE_NAME}. - * - * @param cacheName Cache name. - * @return Masked name. - */ - private String maskNull(@Nullable String cacheName) { - return cacheName == null ? NULL_CACHE_NAME : cacheName; - } - - /** - * @param topVer Topology version. - * @param nodes Nodes. - * @return Filtered collection (potentially empty, but never {@code null}). - */ - private Collection<ClusterNode> filter(final long topVer, @Nullable Collection<ClusterNode> nodes) { - if (nodes == null) - return Collections.emptyList(); - - // If no filtering needed, return original collection. - return nodes.isEmpty() || topVer < 0 || topVer >= maxOrder ? - nodes : - F.view(nodes, new P1<ClusterNode>() { - @Override public boolean apply(ClusterNode node) { - return node.order() <= topVer; - } - }); - } - - /** @return Daemon nodes. */ - Collection<ClusterNode> daemonNodes() { - return daemonNodes; - } - - /** - * @param id Node ID. - * @return Node. - */ - @Nullable ClusterNode node(UUID id) { - return nodeMap.get(id); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(DiscoCache.class, this, "allNodesWithDaemons", U.toShortString(allNodes)); - } - } - /** * Cache predicate. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/42346673/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/DiscoveryEventListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/DiscoveryEventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/DiscoveryEventListener.java new file mode 100644 index 0000000..963d97e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/DiscoveryEventListener.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.eventstorage; + +import java.util.EventListener; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.managers.discovery.DiscoCache; + +/** + * Internal listener for discovery events. + */ +public interface DiscoveryEventListener extends EventListener { + /** + * @param evt Discovery event. + * @param discoCache Discovery cache. + */ + public void onEvent(DiscoveryEvent evt, DiscoCache discoCache); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/42346673/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java index 684e326..f8abd30 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.managers.GridManagerAdapter; import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeployment; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener; import org.apache.ignite.internal.util.GridConcurrentLinkedHashSet; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -79,6 +80,9 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> /** Local event listeners. */ private final ConcurrentMap<Integer, Set<GridLocalEventListener>> lsnrs = new ConcurrentHashMap8<>(); + /** Internal discovery listeners. */ + private final ConcurrentMap<Integer, Set<DiscoveryEventListener>> discoLsnrs = new ConcurrentHashMap8<>(); + /** Busy lock to control activity of threads. */ private final ReadWriteLock busyLock = new ReentrantReadWriteLock(); @@ -234,6 +238,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> msgLsnr = null; lsnrs.clear(); + discoLsnrs.clear(); } /** {@inheritDoc} */ @@ -300,6 +305,30 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> } /** + * Records discovery events. + * + * @param evt Event to record. + * @param discoCache Discovery cache. + */ + public void record(DiscoveryEvent evt, DiscoCache discoCache) { + assert evt != null; + + if (!enterBusy()) + return; + + try { + // Notify internal discovery listeners first. + notifyDiscoveryListeners(evt, discoCache); + + // Notify all other registered listeners. + record(evt); + } + finally { + leaveBusy(); + } + } + + /** * Gets types of enabled user-recordable events. * * @return Array of types of enabled user-recordable events. @@ -570,7 +599,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> try { for (int t : types) { - getOrCreate(t).add(lsnr); + getOrCreate(lsnrs, t).add(lsnr); if (!isRecordable(t)) U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t)); @@ -595,14 +624,14 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> return; try { - getOrCreate(type).add(lsnr); + getOrCreate(lsnrs, type).add(lsnr); if (!isRecordable(type)) U.warn(log, "Added listener for disabled event type: " + U.gridEventName(type)); if (types != null) { for (int t : types) { - getOrCreate(t).add(lsnr); + getOrCreate(lsnrs, t).add(lsnr); if (!isRecordable(t)) U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t)); @@ -615,16 +644,70 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> } /** + * Adds discovery event listener. Note that this method specifically disallow an empty + * array of event type to prevent accidental subscription for all system event that + * may lead to a drastic performance decrease. + * + * @param lsnr Listener to add. + * @param types Event types to subscribe listener for. + */ + public void addDiscoveryEventListener(DiscoveryEventListener lsnr, int[] types) { + assert lsnr != null; + assert types != null; + assert types.length > 0; + + if (!enterBusy()) + return; + + try { + for (int t : types) { + getOrCreate(discoLsnrs, t).add(lsnr); + } + } + finally { + leaveBusy(); + } + } + + /** + * Adds discovery event listener. + * + * @param lsnr Listener to add. + * @param type Event type to subscribe listener for. + * @param types Additional event types to subscribe listener for. + */ + public void addDiscoveryEventListener(DiscoveryEventListener lsnr, int type, @Nullable int... types) { + assert lsnr != null; + + if (!enterBusy()) + return; + + try { + getOrCreate(discoLsnrs, type).add(lsnr); + + if (types != null) { + for (int t : types) { + getOrCreate(discoLsnrs, t).add(lsnr); + } + } + } + finally { + leaveBusy(); + } + } + + /** + * @param lsnrs Listeners map. * @param type Event type. * @return Listeners for given event type. */ - private Collection<GridLocalEventListener> getOrCreate(Integer type) { - Set<GridLocalEventListener> set = lsnrs.get(type); + private <T> Collection<T> getOrCreate(ConcurrentMap<Integer, Set<T>> lsnrs, Integer type) { + Set<T> set = lsnrs.get(type); if (set == null) { set = new GridConcurrentLinkedHashSet<>(); - Set<GridLocalEventListener> prev = lsnrs.putIfAbsent(type, set); + Set<T> prev = lsnrs.putIfAbsent(type, set); if (prev != null) set = prev; @@ -688,6 +771,38 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> } /** + * Removes listener for specified events, if any. If no event types provided - it + * remove the listener for all its registered events. + * + * @param lsnr Listener. + * @param types Event types. + * @return Returns {@code true} if removed. + */ + public boolean removeDiscoveryEventListener(DiscoveryEventListener lsnr, @Nullable int... types) { + assert lsnr != null; + + boolean found = false; + + if (F.isEmpty(types)) { + for (Set<DiscoveryEventListener> set : discoLsnrs.values()) + if (set.remove(lsnr)) + found = true; + } + else { + assert types != null; + + for (int type : types) { + Set<DiscoveryEventListener> set = discoLsnrs.get(type); + + if (set != null && set.remove(lsnr)) + found = true; + } + } + + return found; + } + + /** * * @param p Optional predicate. * @param types Event types to wait for. @@ -780,6 +895,41 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> } /** + * @param evt Discovery event + * @param cache Discovery cache. + */ + private void notifyDiscoveryListeners(DiscoveryEvent evt, DiscoCache cache) { + assert evt != null; + + notifyDiscoveryListeners(discoLsnrs.get(evt.type()), evt, cache); + } + + /** + * @param set Set of listeners. + * @param evt Discovery event. + * @param cache Discovery cache. + */ + private void notifyDiscoveryListeners(@Nullable Collection<DiscoveryEventListener> set, DiscoveryEvent evt, DiscoCache cache) { + assert evt != null; + + if (!F.isEmpty(set)) { + assert set != null; + + for (DiscoveryEventListener lsnr : set) { + try { + lsnr.onEvent(evt, cache); + } + catch (Throwable e) { + U.error(log, "Unexpected exception in listener notification for event: " + evt, e); + + if (e instanceof Error) + throw (Error)e; + } + } + } + } + + /** * @param p Grid event predicate. * @return Collection of grid events. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/42346673/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index a388c7a..5070462 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -39,6 +39,7 @@ import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridNodeOrderComparator; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -252,10 +253,12 @@ public class GridAffinityAssignmentCache { * * @param topVer Topology version to calculate affinity cache for. * @param discoEvt Discovery event that caused this topology version change. + * @param discoCache Discovery cache. * @return Affinity assignments. */ @SuppressWarnings("IfMayBeConditional") - public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt) { + public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt, + DiscoCache discoCache) { if (log.isDebugEnabled()) log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() + ", discoEvt=" + discoEvt + ']'); @@ -266,7 +269,7 @@ public class GridAffinityAssignmentCache { List<ClusterNode> sorted; if (!locCache) { - sorted = new ArrayList<>(ctx.discovery().cacheAffinityNodes(cacheName, topVer)); + sorted = new ArrayList<>(discoCache.cacheAffinityNodes(cacheId())); Collections.sort(sorted, GridNodeOrderComparator.INSTANCE); } http://git-wip-us.apache.org/repos/asf/ignite/blob/42346673/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 459723c..c1dde13 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -387,7 +387,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap cctx.cache().prepareCacheStart(req, fut.topologyVersion()); if (fut.isCacheAdded(cacheId, fut.topologyVersion())) { - if (cctx.discovery().cacheAffinityNodes(req.cacheName(), fut.topologyVersion()).isEmpty()) + if (fut.discoCache().cacheAffinityNodes(req.cacheName()).isEmpty()) U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName()); } @@ -408,7 +408,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion(); List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), - fut.discoveryEvent()); + fut.discoveryEvent(), fut.discoCache()); aff.initialize(fut.topologyVersion(), assignment); } @@ -768,7 +768,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert old == null : old; - List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent()); + List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); cache.affinity().initialize(fut.topologyVersion(), newAff); } @@ -806,7 +806,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) { List<List<ClusterNode>> assignment = - cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent()); + cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); cache.affinity().initialize(fut.topologyVersion(), assignment); } @@ -832,14 +832,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private void initAffinity(GridAffinityAssignmentCache aff, GridDhtPartitionsExchangeFuture fut, boolean fetch) throws IgniteCheckedException { if (!fetch && canCalculateAffinity(aff, fut)) { - List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent()); + List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); aff.initialize(fut.topologyVersion(), assignment); } else { GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, aff.cacheName(), - fut.topologyVersion()); + fut.topologyVersion(), + fut.discoCache()); fetchFut.init(); @@ -893,7 +894,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap CacheHolder cache = cache(fut, cacheDesc); - List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, fut.discoveryEvent()); + List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, fut.discoveryEvent(), fut.discoCache()); cache.affinity().initialize(topVer, newAff); } @@ -960,14 +961,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (cctx.localNodeId().equals(cacheDesc.receivedFrom())) { List<List<ClusterNode>> assignment = - cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent()); + cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); cacheCtx.affinity().affinityCache().initialize(fut.topologyVersion(), assignment); } else { GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, cacheCtx.name(), - topVer); + topVer, + fut.discoCache()); fetchFut.init(); @@ -1001,7 +1003,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap GridDhtAffinityAssignmentResponse res = fetchFut.get(); if (res == null) { - List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent()); + List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache()); affCache.initialize(topVer, aff); } @@ -1013,7 +1015,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap else { assert !affCache.centralizedAffinityFunction() || !lateAffAssign; - affCache.calculate(topVer, fut.discoveryEvent()); + affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache()); } List<List<ClusterNode>> aff = res.affinityAssignment(cctx.discovery()); @@ -1043,7 +1045,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (cacheCtx.isLocal()) continue; - cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent()); + cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); } centralizedAff = true; @@ -1093,7 +1095,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (cache != null) { if (cache.client()) - cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent()); + cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); return; } @@ -1133,7 +1135,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, aff.cacheName(), - prev.topologyVersion()); + prev.topologyVersion(), + prev.discoCache()); fetchFut.init(); @@ -1144,7 +1147,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap throws IgniteCheckedException { fetchAffinity(prev, aff, (GridDhtAssignmentFetchFuture)fetchFut); - aff.calculate(fut.topologyVersion(), fut.discoveryEvent()); + aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); affFut.onDone(fut.topologyVersion()); } @@ -1283,7 +1286,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert aff.idealAssignment() != null : "Previous assignment is not available."; - List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent()); + List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent(), fut.discoCache()); List<List<ClusterNode>> newAssignment = null; if (latePrimary) { http://git-wip-us.apache.org/repos/asf/ignite/blob/42346673/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index a65d971..24321b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -28,7 +28,6 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; -import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteFuture; @@ -77,7 +76,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { @Override protected void onKernalStart0() throws IgniteCheckedException { if (cctx.isLocal()) // No discovery event needed for local affinity. - aff.calculate(LOC_CACHE_TOP_VER, null); + aff.calculate(LOC_CACHE_TOP_VER, null, null); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/42346673/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 695872f..ff7feb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -48,7 +48,6 @@ import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; @@ -57,6 +56,8 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDiscoveryMessage; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; @@ -181,35 +182,33 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana private DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); /** Discovery listener. */ - private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { - @Override public void onEvent(Event evt) { + private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() { + @Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) { if (!enterBusy()) return; try { - DiscoveryEvent e = (DiscoveryEvent)evt; - ClusterNode loc = cctx.localNode(); - assert e.type() == EVT_NODE_JOINED || e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED || - e.type() == EVT_DISCOVERY_CUSTOM_EVT; + assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED || + evt.type() == EVT_DISCOVERY_CUSTOM_EVT; - final ClusterNode n = e.eventNode(); + final ClusterNode n = evt.eventNode(); GridDhtPartitionExchangeId exchId = null; GridDhtPartitionsExchangeFuture exchFut = null; - if (e.type() != EVT_DISCOVERY_CUSTOM_EVT) { + if (evt.type() != EVT_DISCOVERY_CUSTOM_EVT) { assert !loc.id().equals(n.id()); - if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) { + if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) { assert cctx.discovery().node(n.id()) == null; // Avoid race b/w initial future add and discovery event. GridDhtPartitionsExchangeFuture initFut = null; if (readyTopVer.get().equals(AffinityTopologyVersion.NONE)) { - initFut = exchangeFuture(initialExchangeId(), null, null, null); + initFut = exchangeFuture(initialExchangeId(), null, null, null, null); initFut.onNodeLeft(n); } @@ -220,16 +219,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } - assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() : + assert evt.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " + "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']'; - exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type()); + exchId = exchangeId(n.id(), + affinityTopologyVersion(evt), + evt.type()); - exchFut = exchangeFuture(exchId, e, null, null); + exchFut = exchangeFuture(exchId, evt, cache,null, null); } else { - DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e; + DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)evt; if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) { DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage(); @@ -260,9 +261,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana //todo think about refactoring if (!F.isEmpty(valid) && !(valid.size() == 1 && valid.iterator().next().globalStateChange())) { - exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type()); + exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); - exchFut = exchangeFuture(exchId, e, valid, null); + exchFut = exchangeFuture(exchId, evt, cache, valid, null); } } else if (customEvt.customMessage() instanceof CacheAffinityChangeMessage) { @@ -270,19 +271,19 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (msg.exchangeId() == null) { if (msg.exchangeNeeded()) { - exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type()); + exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); - exchFut = exchangeFuture(exchId, e, null, msg); + exchFut = exchangeFuture(exchId, evt, cache, null, msg); } } else - exchangeFuture(msg.exchangeId(), null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg); + exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg); } else if (customEvt.customMessage() instanceof StartSnapshotOperationAckDiscoveryMessage && !((StartSnapshotOperationAckDiscoveryMessage)customEvt.customMessage()).hasError()) { - exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type()); + exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); - exchFut = exchangeFuture(exchId, e, null, null); + exchFut = exchangeFuture(exchId, evt, null, null, null); } } @@ -291,7 +292,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana log.debug("Discovery event (will start exchange): " + exchId); // Event callback - without this callback future will never complete. - exchFut.onEvent(exchId, e); + exchFut.onEvent(exchId, evt, cache); // Start exchange process. addFuture(exchFut); @@ -313,7 +314,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana exchWorker = new ExchangeWorker(); - cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, + cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_DISCOVERY_CUSTOM_EVT); cctx.io().addHandler(0, GridDhtPartitionsSingleMessage.class, @@ -371,11 +372,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana assert startTime > 0; // Generate dummy discovery event for local node joining. - DiscoveryEvent discoEvt = cctx.discovery().localJoinEvent(); + T2<DiscoveryEvent, DiscoCache> localJoin = cctx.discovery().localJoin(); + + DiscoveryEvent discoEvt = localJoin.get1(); + DiscoCache discoCache = localJoin.get2(); GridDhtPartitionExchangeId exchId = initialExchangeId(); - GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, null, null); + GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, discoCache, null, null); if (reconnect) reconnectExchangeFut = new GridFutureAdapter<>(); @@ -482,7 +486,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** {@inheritDoc} */ @Override protected void onKernalStop0(boolean cancel) { - cctx.gridEvents().removeLocalEventListener(discoLsnr); + cctx.gridEvents().removeDiscoveryEventListener(discoLsnr); cctx.io().removeHandler(0, GridDhtPartitionsSingleMessage.class); cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class); @@ -1085,12 +1089,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * @param exchId Exchange ID. * @param discoEvt Discovery event. + * @param cache Discovery data cache. * @param reqs Cache change requests. * @param affChangeMsg Affinity change message. * @return Exchange future. */ private GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId, @Nullable DiscoveryEvent discoEvt, + @Nullable DiscoCache cache, @Nullable Collection<DynamicCacheChangeRequest> reqs, @Nullable CacheAffinityChangeMessage affChangeMsg) { GridDhtPartitionsExchangeFuture fut; @@ -1109,7 +1115,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } if (discoEvt != null) - fut.onEvent(exchId, discoEvt); + fut.onEvent(exchId, discoEvt, cache); if (stopErr != null) fut.onDone(stopErr); @@ -1265,7 +1271,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana cctx.database().releaseHistoryForPreloading(); } else - exchangeFuture(msg.exchangeId(), null, null, null).onReceive(node, msg); + exchangeFuture(msg.exchangeId(), null, null, null, null).onReceive(node, msg); } finally { leaveBusy(); @@ -1318,8 +1324,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } else { if (msg.client()) { - final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture( - msg.exchangeId(), null, null, null); + final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(), + null, + null, + null, + null); exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { @@ -1329,7 +1338,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana }); } else - exchangeFuture(msg.exchangeId(), null, null, null).onReceive(node, msg); + exchangeFuture(msg.exchangeId(), null, null, null, null).onReceive(node, msg); } } finally {
