http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 7cf75fe..81f21a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -48,14 +48,14 @@ import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; @@ -173,35 +173,33 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana private DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); /** Discovery listener. */ - private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { - @Override public void onEvent(Event evt) { + private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() { + @Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) { if (!enterBusy()) return; try { - DiscoveryEvent e = (DiscoveryEvent)evt; - ClusterNode loc = cctx.localNode(); - assert e.type() == EVT_NODE_JOINED || e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED || - e.type() == EVT_DISCOVERY_CUSTOM_EVT; + assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED || + evt.type() == EVT_DISCOVERY_CUSTOM_EVT; - final ClusterNode n = e.eventNode(); + final ClusterNode n = evt.eventNode(); GridDhtPartitionExchangeId exchId = null; GridDhtPartitionsExchangeFuture exchFut = null; - if (e.type() != EVT_DISCOVERY_CUSTOM_EVT) { + if (evt.type() != EVT_DISCOVERY_CUSTOM_EVT) { assert !loc.id().equals(n.id()); - if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) { + if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) { assert cctx.discovery().node(n.id()) == null; // Avoid race b/w initial future add and discovery event. GridDhtPartitionsExchangeFuture initFut = null; if (readyTopVer.get().equals(AffinityTopologyVersion.NONE)) { - initFut = exchangeFuture(initialExchangeId(), null, null, null); + initFut = exchangeFuture(initialExchangeId(), null, null, null, null); initFut.onNodeLeft(n); } @@ -213,18 +211,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } assert - e.type() != EVT_NODE_JOINED || n.order() > loc.order() : + evt.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " + "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']'; exchId = exchangeId(n.id(), - affinityTopologyVersion(e), - e.type()); + affinityTopologyVersion(evt), + evt.type()); - exchFut = exchangeFuture(exchId, e, null, null); + exchFut = exchangeFuture(exchId, evt, cache,null, null); } else { - DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e; + DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)evt; if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) { DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage(); @@ -254,9 +252,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } if (!F.isEmpty(valid)) { - exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type()); + exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); - exchFut = exchangeFuture(exchId, e, valid, null); + exchFut = exchangeFuture(exchId, evt, cache, valid, null); } } else if (customEvt.customMessage() instanceof CacheAffinityChangeMessage) { @@ -264,13 +262,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (msg.exchangeId() == null) { if (msg.exchangeNeeded()) { - exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type()); + exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); - exchFut = exchangeFuture(exchId, e, null, msg); + exchFut = exchangeFuture(exchId, evt, cache, null, msg); } } else - exchangeFuture(msg.exchangeId(), null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg); + exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg); } } @@ -279,7 +277,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana log.debug("Discovery event (will start exchange): " + exchId); // Event callback - without this callback future will never complete. - exchFut.onEvent(exchId, e); + exchFut.onEvent(exchId, evt, cache); // Start exchange process. addFuture(exchFut); @@ -301,7 +299,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana exchWorker = new ExchangeWorker(); - cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, + cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_DISCOVERY_CUSTOM_EVT); cctx.io().addHandler(0, GridDhtPartitionsSingleMessage.class, @@ -359,11 +357,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana assert startTime > 0; // Generate dummy discovery event for local node joining. - DiscoveryEvent discoEvt = cctx.discovery().localJoinEvent(); + T2<DiscoveryEvent, DiscoCache> localJoin = cctx.discovery().localJoin(); + + DiscoveryEvent discoEvt = localJoin.get1(); + DiscoCache discoCache = localJoin.get2(); GridDhtPartitionExchangeId exchId = initialExchangeId(); - GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, null, null); + GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, discoCache, null, null); if (reconnect) reconnectExchangeFut = new GridFutureAdapter<>(); @@ -470,7 +471,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** {@inheritDoc} */ @Override protected void onKernalStop0(boolean cancel) { - cctx.gridEvents().removeLocalEventListener(discoLsnr); + cctx.gridEvents().removeDiscoveryEventListener(discoLsnr); cctx.io().removeHandler(0, GridDhtPartitionsSingleMessage.class); cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class); @@ -1063,12 +1064,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * @param exchId Exchange ID. * @param discoEvt Discovery event. + * @param cache Discovery data cache. * @param reqs Cache change requests. * @param affChangeMsg Affinity change message. * @return Exchange future. */ private GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId, @Nullable DiscoveryEvent discoEvt, + @Nullable DiscoCache cache, @Nullable Collection<DynamicCacheChangeRequest> reqs, @Nullable CacheAffinityChangeMessage affChangeMsg) { GridDhtPartitionsExchangeFuture fut; @@ -1087,7 +1090,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } if (discoEvt != null) - fut.onEvent(exchId, discoEvt); + fut.onEvent(exchId, discoEvt, cache); if (stopErr != null) fut.onDone(stopErr); @@ -1231,7 +1234,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana refreshPartitions(); } else - exchangeFuture(msg.exchangeId(), null, null, null).onReceive(node, msg); + exchangeFuture(msg.exchangeId(), null, null, null, null).onReceive(node, msg); } finally { leaveBusy(); @@ -1287,6 +1290,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(), null, null, + null, null); exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @@ -1297,7 +1301,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana }); } else - exchangeFuture(msg.exchangeId(), null, null, null).onReceive(node, msg); + exchangeFuture(msg.exchangeId(), null, null, null, null).onReceive(node, msg); } } finally {
http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 816132d..9366d0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -31,6 +31,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; @@ -42,7 +43,6 @@ import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; @@ -103,6 +103,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** */ private final Object similarAffKey; + /** */ + private volatile DiscoCache discoCache; + /** * @param cctx Context. * @param cacheId Cache ID. @@ -121,6 +124,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { topVer = exchFut.topologyVersion(); + discoCache = exchFut.discoCache(); + log = cctx.logger(getClass()); lock.writeLock().lock(); @@ -191,6 +196,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { this.stopping = stopping; topVer = exchId.topologyVersion(); + discoCache = exchFut.discoCache(); updateSeq.setIfGreater(updSeq); @@ -271,7 +277,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { removeNode(exchId.nodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); assert oldest != null; @@ -424,7 +430,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { if (!F.isEmpty(nodeIds)) { for (UUID nodeId : nodeIds) { - ClusterNode n = cctx.discovery().node(nodeId); + ClusterNode n = discoCache.node(nodeId); if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) { if (nodes == null) @@ -450,7 +456,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { * @return List of nodes for the partition. */ private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { - Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.allNodes(cctx, topVer)) : null; + Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.allNodesWithCaches()) : null; lock.readLock().lock(); @@ -473,7 +479,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { continue; if (hasState(p, id, state, states)) { - ClusterNode n = cctx.discovery().node(id); + ClusterNode n = discoCache.node(id); if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) nodes.add(n); @@ -766,7 +772,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { assert nodeId.equals(cctx.localNodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); // If this node became the oldest node. if (oldest.id().equals(cctx.localNodeId())) { @@ -816,7 +822,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { assert nodeId != null; assert lock.writeLock().isHeldByCurrentThread(); - ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); ClusterNode loc = cctx.localNode(); http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java index ab8e863..2c3d7ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java @@ -29,6 +29,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridNodeOrderComparator; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -72,16 +73,18 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin * @param ctx Context. * @param cacheName Cache name. * @param topVer Topology version. + * @param discoCache Discovery cache. */ public GridDhtAssignmentFetchFuture( GridCacheSharedContext ctx, String cacheName, - AffinityTopologyVersion topVer + AffinityTopologyVersion topVer, + DiscoCache discoCache ) { this.ctx = ctx; this.key = new T2<>(CU.cacheId(cacheName), topVer); - Collection<ClusterNode> availableNodes = ctx.discovery().cacheAffinityNodes(cacheName, topVer); + Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(CU.cacheId(cacheName)); LinkedList<ClusterNode> tmp = new LinkedList<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 1b4dcc9..5c3fba0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -36,6 +36,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -95,6 +96,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** */ private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE; + /** Discovery cache. */ + private volatile DiscoCache discoCache; + /** */ private volatile boolean stopping; @@ -151,6 +155,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { rebalancedTopVer = AffinityTopologyVersion.NONE; topVer = AffinityTopologyVersion.NONE; + + discoCache = cctx.discovery().discoCache(); } finally { lock.writeLock().unlock(); @@ -293,6 +299,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { rebalancedTopVer = AffinityTopologyVersion.NONE; topVer = exchId.topologyVersion(); + + discoCache = exchFut.discoCache(); } finally { lock.writeLock().unlock(); @@ -356,7 +364,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) { ClusterNode loc = cctx.localNode(); - ClusterNode oldest = currentCoordinator(); + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); @@ -481,7 +489,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (exchId.isLeft()) removeNode(exchId.nodeId()); - ClusterNode oldest = currentCoordinator(); + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); if (log.isDebugEnabled()) log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); @@ -882,7 +890,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { - Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.affinityNodes(cctx, topVer)) : null; + Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.cacheAffinityNodes(cctx.cacheId())) : null; lock.readLock().lock(); @@ -979,7 +987,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, + @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionFullMap partMap, @Nullable Map<Integer, Long> cntrMap) { if (log.isDebugEnabled()) @@ -1112,7 +1120,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, + @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionMap2 parts, @Nullable Map<Integer, Long> cntrMap, boolean checkEvictions) { @@ -1284,7 +1292,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { List<ClusterNode> affNodes = aff.get(p); if (!affNodes.contains(cctx.localNode())) { - Collection<UUID> nodeIds = F.nodeIds(nodes(p, topVer, OWNING)); + List<ClusterNode> nodes = nodes(p, topVer, OWNING); + Collection<UUID> nodeIds = F.nodeIds(nodes); // If all affinity nodes are owners, then evict partition from local node. if (nodeIds.containsAll(F.nodeIds(affNodes))) { @@ -1302,15 +1311,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { int affCnt = affNodes.size(); if (ownerCnt > affCnt) { - List<ClusterNode> sorted = new ArrayList<>(cctx.discovery().nodes(nodeIds)); - // Sort by node orders in ascending order. - Collections.sort(sorted, CU.nodeComparator(true)); + Collections.sort(nodes, CU.nodeComparator(true)); - int diff = sorted.size() - affCnt; + int diff = nodes.size() - affCnt; for (int i = 0; i < diff; i++) { - ClusterNode n = sorted.get(i); + ClusterNode n = nodes.get(i); if (locId.equals(n.id())) { part.rent(false); @@ -1336,17 +1343,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** - * @return Current coordinator node. - */ - @Nullable private ClusterNode currentCoordinator() { - ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer); - - assert oldest != null || cctx.kernalContext().clientNode(); - - return oldest; - } - - /** * Updates value for single partition. * * @param p Partition. @@ -1356,7 +1352,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) private long updateLocal(int p, GridDhtPartitionState state, long updateSeq) { - ClusterNode oldest = currentCoordinator(); + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); assert oldest != null || cctx.kernalContext().clientNode(); @@ -1421,7 +1417,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private void removeNode(UUID nodeId) { assert nodeId != null; - ClusterNode oldest = CU.oldest(cctx.discovery().serverNodes(topVer)); + ClusterNode oldest = discoCache.oldestAliveServerNode(); assert oldest != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 44780f1..c91f881 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -2441,7 +2441,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { AffinityTopologyVersion topVer = req.topologyVersion(); - boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer); + boolean checkReaders = hasNear || ctx.discovery().hasNearCache(ctx.cacheId(), topVer); boolean readersOnly = false; @@ -2676,7 +2676,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { AffinityTopologyVersion topVer = req.topologyVersion(); - boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer); + boolean checkReaders = hasNear || ctx.discovery().hasNearCache(ctx.cacheId(), topVer); CacheStorePartialUpdateException storeErr = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index e945de9..7b5d09b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -44,7 +44,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; -import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; @@ -103,6 +103,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** Dummy reassign flag. */ private final boolean reassign; + /** */ + @GridToStringExclude + private volatile DiscoCache discoCache; + /** Discovery event. */ private volatile DiscoveryEvent discoEvt; @@ -147,9 +151,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** */ private boolean init; - /** Topology snapshot. */ - private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = new AtomicReference<>(); - /** Last committed cache version before next topology version use. */ private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>(); @@ -332,6 +333,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** + * @return Discovery cache. + */ + public DiscoCache discoCache() { + return discoCache; + } + + /** * @param cacheId Cache ID to check. * @param topVer Topology version. * @return {@code True} if cache was added during this exchange. @@ -374,11 +382,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * * @param exchId Exchange ID. * @param discoEvt Discovery event. + * @param discoCache Discovery data cache. */ - public void onEvent(GridDhtPartitionExchangeId exchId, DiscoveryEvent discoEvt) { + public void onEvent(GridDhtPartitionExchangeId exchId, DiscoveryEvent discoEvt, DiscoCache discoCache) { assert exchId.equals(this.exchId); this.discoEvt = discoEvt; + this.discoCache = discoCache; evtLatch.countDown(); } @@ -435,7 +445,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert !dummy && !forcePreload : this; try { - srvNodes = new ArrayList<>(cctx.discovery().serverNodes(topologyVersion())); + discoCache.updateAlives(cctx.discovery()); + + srvNodes = new ArrayList<>(discoCache.serverNodes()); remaining.addAll(F.nodeIds(F.view(srvNodes, F.remoteNodes(cctx.localNodeId())))); @@ -550,7 +562,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT exchId.topologyVersion().equals(cacheCtx.startTopologyVersion()); if (updateTop && clientTop != null) - cacheCtx.topology().update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false)); + top.update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false)); } top.updateTopologyVersion(exchId, this, updSeq, stopping(cacheCtx.cacheId())); @@ -842,7 +854,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT List<String> cachesWithoutNodes = null; for (String name : cctx.cache().cacheNames()) { - if (cctx.discovery().cacheAffinityNodes(name, topologyVersion()).isEmpty()) { + if (discoCache.cacheAffinityNodes(name).isEmpty()) { if (cachesWithoutNodes == null) cachesWithoutNodes = new ArrayList<>(); @@ -1096,7 +1108,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * Cleans up resources to avoid excessive memory usage. */ public void cleanUp() { - topSnapshot.set(null); singleMsgs.clear(); fullMsgs.clear(); crd = null; @@ -1252,7 +1263,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT try { assert crd.isLocal(); - if (!crd.equals(cctx.discovery().serverNodes(topologyVersion()).get(0))) { + if (!crd.equals(discoCache.serverNodes().get(0))) { for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) cacheCtx.topology().beforeExchange(GridDhtPartitionsExchangeFuture.this, !centralizedAff); @@ -1559,6 +1570,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT ClusterNode crd0; + discoCache.updateAlives(node); + synchronized (mux) { if (!srvNodes.remove(node)) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index d26242d..99146aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -48,7 +48,6 @@ import org.apache.ignite.compute.ComputeJobContext; import org.apache.ignite.configuration.DeploymentMode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.GridClosureCallMode; import org.apache.ignite.internal.GridKernalContext; @@ -58,8 +57,9 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; @@ -167,7 +167,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { private IgniteInternalCache<Object, Object> cache; /** Topology listener. */ - private GridLocalEventListener topLsnr = new TopologyListener(); + private DiscoveryEventListener topLsnr = new TopologyListener(); static { Set<IgniteProductVersion> versions = new TreeSet<>(new Comparator<IgniteProductVersion>() { @@ -251,7 +251,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { cache = ctx.cache().utilityCache(); if (!ctx.clientNode()) - ctx.event().addLocalEventListener(topLsnr, EVTS); + ctx.event().addDiscoveryEventListener(topLsnr, EVTS); try { if (ctx.deploy().enabled()) @@ -314,7 +314,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { busyLock.block(); if (!ctx.clientNode()) - ctx.event().removeLocalEventListener(topLsnr); + ctx.event().removeDiscoveryEventListener(topLsnr); Collection<ServiceContextImpl> ctxs = new ArrayList<>(); @@ -1568,9 +1568,9 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** * Topology listener. */ - private class TopologyListener implements GridLocalEventListener { + private class TopologyListener implements DiscoveryEventListener { /** {@inheritDoc} */ - @Override public void onEvent(Event evt) { + @Override public void onEvent(DiscoveryEvent evt, final DiscoCache discoCache) { if (!busyLock.enterBusy()) return; @@ -1588,11 +1588,14 @@ public class GridServiceProcessor extends GridProcessorAdapter { } } else - topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0); + topVer = new AffinityTopologyVersion((evt).topologyVersion(), 0); depExe.execute(new BusyRunnable() { @Override public void run0() { - ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer); + // In case the cache instance isn't tracked by DiscoveryManager anymore. + discoCache.updateAlives(ctx.discovery()); + + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); if (oldest != null && oldest.isLocal()) { final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java index 31b4bc7..f0c50eb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java @@ -23,16 +23,11 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.ignite.Ignite; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; -import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.lang.IgnitePredicate; @@ -90,8 +85,8 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe } /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); CacheConfiguration cCfg = defaultCacheConfiguration(); @@ -103,7 +98,7 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe TcpDiscoverySpi disc = new TcpDiscoverySpi(); - if (clientMode && ((gridName.charAt(gridName.length() - 1) - '0') & 1) != 0) + if (clientMode && ((igniteInstanceName.charAt(igniteInstanceName.length() - 1) - '0') & 1) != 0) cfg.setClientMode(true); else disc.setMaxMissedClientHeartbeats(50); @@ -158,8 +153,6 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe stopTempNodes(); latch.await(); - - validateAlives(); } } @@ -200,55 +193,6 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe } /** - * Validates that all node collections contain actual information. - */ - @SuppressWarnings("SuspiciousMethodCalls") - private void validateAlives() { - for (Ignite g : alive) { - log.info("Validate node: " + g.name()); - - assertEquals("Unexpected nodes number for node: " + g.name(), PERM_NODES_CNT, g.cluster().nodes().size()); - } - - for (final Ignite g : alive) { - IgniteKernal k = (IgniteKernal)g; - - GridDiscoveryManager discoMgr = k.context().discovery(); - - final Collection<ClusterNode> currTop = g.cluster().nodes(); - - long currVer = discoMgr.topologyVersion(); - - long startVer = discoMgr.localNode().order(); - - for (long v = currVer; v > currVer - GridDiscoveryManager.DISCOVERY_HISTORY_SIZE && v >= startVer; v--) { - F.forAll(discoMgr.aliveCacheNodes(null, new AffinityTopologyVersion(v)), - new IgnitePredicate<ClusterNode>() { - @Override public boolean apply(ClusterNode e) { - return currTop.contains(e); - } - }); - - F.forAll(discoMgr.aliveRemoteCacheNodes(null, new AffinityTopologyVersion(v)), - new IgnitePredicate<ClusterNode>() { - @Override public boolean apply(ClusterNode e) { - return currTop.contains(e) || g.cluster().localNode().equals(e); - } - }); - - GridCacheSharedContext<?, ?> ctx = k.context().cache().context(); - - ClusterNode oldest = - ctx.discovery().oldestAliveCacheServerNode(new AffinityTopologyVersion(currVer)); - - assertNotNull(oldest); - - assertTrue(currTop.contains(oldest)); - } - } - } - - /** * Starts temporary nodes. * * @throws Exception If failed. @@ -293,4 +237,4 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe G.stop(g.name(), false); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java index ba8fa5b..5de2910 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java @@ -54,10 +54,10 @@ public abstract class GridDiscoveryManagerAttributesSelfTest extends GridCommonA private static boolean binaryMarshallerEnabled; /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - if (gridName.equals(getTestGridName(1))) + if (igniteInstanceName.equals(getTestGridName(1))) cfg.setClientMode(true); if (binaryMarshallerEnabled) @@ -160,7 +160,7 @@ public abstract class GridDiscoveryManagerAttributesSelfTest extends GridCommonA if (fail) fail("Node should not join"); } - catch (Exception e) { + catch (Exception ignored) { if (!fail) fail("Node should join"); } @@ -215,7 +215,7 @@ public abstract class GridDiscoveryManagerAttributesSelfTest extends GridCommonA if (fail) fail("Node should not join"); } - catch (Exception e) { + catch (Exception ignored) { if (!fail) fail("Node should join"); } @@ -346,8 +346,8 @@ public abstract class GridDiscoveryManagerAttributesSelfTest extends GridCommonA */ public static class RegularDiscovery extends GridDiscoveryManagerAttributesSelfTest { /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java deleted file mode 100644 index c9179d4..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.managers.discovery; - -import org.apache.ignite.Ignite; -import org.apache.ignite.Ignition; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.NearCacheConfiguration; -import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -import static org.apache.ignite.cache.CacheMode.PARTITIONED; - -/** - * - */ -public abstract class GridDiscoveryManagerSelfTest extends GridCommonAbstractTest { - /** */ - private static final String CACHE_NAME = "cache"; - - /** */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - } - - /** {@inheritDoc} */ - @SuppressWarnings("IfMayBeConditional") - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - CacheConfiguration ccfg1 = defaultCacheConfiguration(); - - ccfg1.setName(CACHE_NAME); - - CacheConfiguration ccfg2 = defaultCacheConfiguration(); - - ccfg2.setName(null); - - if (gridName.equals(getTestGridName(1))) - cfg.setClientMode(true); - else { - ccfg1.setNearConfiguration(null); - ccfg2.setNearConfiguration(null); - - ccfg1.setCacheMode(PARTITIONED); - ccfg2.setCacheMode(PARTITIONED); - - cfg.setCacheConfiguration(ccfg1, ccfg2); - } - - TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi(); - - discoverySpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoverySpi); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testHasNearCache() throws Exception { - IgniteKernal g0 = (IgniteKernal)startGrid(0); // PARTITIONED_ONLY cache. - - AffinityTopologyVersion none = new AffinityTopologyVersion(-1); - AffinityTopologyVersion one = new AffinityTopologyVersion(1); - AffinityTopologyVersion two = new AffinityTopologyVersion(2, 2); - AffinityTopologyVersion three = new AffinityTopologyVersion(3); - AffinityTopologyVersion four = new AffinityTopologyVersion(4); - AffinityTopologyVersion five = new AffinityTopologyVersion(5); - - assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, none)); - assertFalse(g0.context().discovery().hasNearCache(null, none)); - - assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one)); - assertFalse(g0.context().discovery().hasNearCache(null, one)); - - IgniteKernal g1 = (IgniteKernal)startGrid(1); // NEAR_ONLY cache. - - grid(1).createNearCache(null, new NearCacheConfiguration()); - - grid(1).createNearCache(CACHE_NAME, new NearCacheConfiguration()); - - assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one)); - assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, two)); - assertFalse(g0.context().discovery().hasNearCache(null, one)); - assertTrue(g0.context().discovery().hasNearCache(null, two)); - - assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, two)); - assertTrue(g1.context().discovery().hasNearCache(null, two)); - - IgniteKernal g2 = (IgniteKernal)startGrid(2); // PARTITIONED_ONLY cache. - - assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one)); - assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, two)); - assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, three)); - assertFalse(g0.context().discovery().hasNearCache(null, one)); - assertTrue(g0.context().discovery().hasNearCache(null, two)); - assertTrue(g0.context().discovery().hasNearCache(null, three)); - - assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, two)); - assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, three)); - assertTrue(g1.context().discovery().hasNearCache(null, two)); - assertTrue(g1.context().discovery().hasNearCache(null, three)); - - assertTrue(g2.context().discovery().hasNearCache(CACHE_NAME, three)); - assertTrue(g2.context().discovery().hasNearCache(null, three)); - - stopGrid(2); - - // Wait all nodes are on version 4. - for (;;) { - if (F.forAll( - Ignition.allGrids(), - new IgnitePredicate<Ignite>() { - @Override public boolean apply(Ignite ignite) { - return ignite.cluster().topologyVersion() == 4; - } - })) - break; - - Thread.sleep(1000); - } - - assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one)); - assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, two)); - assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, three)); - assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, four)); - assertFalse(g0.context().discovery().hasNearCache(null, one)); - assertTrue(g0.context().discovery().hasNearCache(null, two)); - assertTrue(g0.context().discovery().hasNearCache(null, three)); - assertTrue(g0.context().discovery().hasNearCache(null, four)); - - assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, three)); - assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, four)); - assertTrue(g1.context().discovery().hasNearCache(null, three)); - assertTrue(g1.context().discovery().hasNearCache(null, four)); - - stopGrid(1); - - // Wait all nodes are on version 5. - for (;;) { - if (F.forAll( - Ignition.allGrids(), - new IgnitePredicate<Ignite>() { - @Override public boolean apply(Ignite ignite) { - return ignite.cluster().topologyVersion() == 5; - } - })) - break; - - Thread.sleep(1000); - } - - assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one)); - assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, two)); - assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, three)); - assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, four)); - assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, five)); - - assertFalse(g0.context().discovery().hasNearCache(null, one)); - assertTrue(g0.context().discovery().hasNearCache(null, two)); - assertTrue(g0.context().discovery().hasNearCache(null, three)); - assertTrue(g0.context().discovery().hasNearCache(null, four)); - assertFalse(g0.context().discovery().hasNearCache(null, five)); - } - - /** - * - */ - public static class RegularDiscovery extends GridDiscoveryManagerSelfTest { - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true); - - return cfg; - } - } - - /** - * - */ - public static class ClientDiscovery extends GridDiscoveryManagerSelfTest { - // No-op. - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteTopologyPrintFormatSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteTopologyPrintFormatSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteTopologyPrintFormatSelfTest.java index 58992af..86ad458 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteTopologyPrintFormatSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteTopologyPrintFormatSelfTest.java @@ -50,16 +50,16 @@ public class IgniteTopologyPrintFormatSelfTest extends GridCommonAbstractTest { private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); TcpDiscoverySpi disc = new TcpDiscoverySpi(); disc.setIpFinder(IP_FINDER); - if (gridName.endsWith("client")) + if (igniteInstanceName.endsWith("client")) cfg.setClientMode(true); - if (gridName.endsWith("client_force_server")) { + if (igniteInstanceName.endsWith("client_force_server")) { cfg.setClientMode(true); disc.setForceServerMode(true); } http://git-wip-us.apache.org/repos/asf/ignite/blob/8273e670/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java index b28619c..985dddb 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java @@ -42,7 +42,6 @@ import org.apache.ignite.internal.managers.communication.GridCommunicationSendMe import org.apache.ignite.internal.managers.deployment.GridDeploymentManagerStopSelfTest; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManagerAliveCacheSelfTest; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManagerAttributesSelfTest; -import org.apache.ignite.internal.managers.discovery.GridDiscoveryManagerSelfTest; import org.apache.ignite.internal.managers.discovery.IgniteTopologyPrintFormatSelfTest; import org.apache.ignite.internal.managers.events.GridEventStorageManagerSelfTest; import org.apache.ignite.internal.managers.swapspace.GridSwapSpaceManagerSelfTest; @@ -111,8 +110,6 @@ public class IgniteKernalSelfTestSuite extends TestSuite { suite.addTestSuite(GridDiscoveryManagerAttributesSelfTest.RegularDiscovery.class); suite.addTestSuite(GridDiscoveryManagerAttributesSelfTest.ClientDiscovery.class); suite.addTestSuite(GridDiscoveryManagerAliveCacheSelfTest.class); - suite.addTestSuite(GridDiscoveryManagerSelfTest.RegularDiscovery.class); - suite.addTestSuite(GridDiscoveryManagerSelfTest.ClientDiscovery.class); suite.addTestSuite(GridDiscoveryEventSelfTest.class); suite.addTestSuite(GridPortProcessorSelfTest.class); suite.addTestSuite(GridHomePathSelfTest.class);