ignite-1.5 Fixed client discovery impl to skip node failed message processing while disconnected.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d4687d9f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d4687d9f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d4687d9f Branch: refs/heads/master Commit: d4687d9f636b38736d327351ca4b22c3262a2ae8 Parents: 58b55b5 Author: sboikov <sboi...@gridgain.com> Authored: Mon Dec 21 10:19:51 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Dec 21 10:19:51 2015 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 4 +- .../dht/preloader/GridDhtPreloader.java | 29 ------------ .../ignite/spi/discovery/tcp/ClientImpl.java | 48 +++++++++++--------- .../IgniteClientReconnectCacheTest.java | 26 +++++++++-- .../cache/IgniteCachePutAllRestartTest.java | 2 +- 5 files changed, 53 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d4687d9f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 92d66d7..72a2bef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -1641,7 +1641,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (cache == null) { throw new IgniteException("Failed to resolve nodes topology [cacheName=" + cacheName + - ", topVer=" + topVer + ", history=" + discoCacheHist.keySet() + + ", topVer=" + topVer + + ", history=" + discoCacheHist.keySet() + + ", snap=" + snap + ", locNode=" + ctx.discovery().localNode() + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d4687d9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index c46a66c..f0054e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -48,7 +48,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; -import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -92,9 +91,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** */ private GridDhtPartitionTopology top; - /** Topology version. */ - private final GridAtomicLong topVer = new GridAtomicLong(); - /** Force key futures. */ private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap(); @@ -149,11 +145,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " + "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']'; - boolean set = topVer.setIfGreater(e.topologyVersion()); - - assert set : "Have you configured TcpDiscoverySpi for your in-memory data grid? [newVer=" + - e.topologyVersion() + ", curVer=" + topVer.get() + ", evt=" + e + ']'; - if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) { for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values()) fut.onNodeLeft(e.eventNode().id()); @@ -238,20 +229,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - if (log.isDebugEnabled()) - log.debug("DHT rebalancer onKernalStart callback."); - - ClusterNode loc = cctx.localNode(); - - assert loc.metrics().getStartTime() > 0; - - final long startTopVer = loc.order(); - - topVer.setIfGreater(startTopVer); - } - - /** {@inheritDoc} */ @Override public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) { super.preloadPredicate(preloadPred); @@ -382,12 +359,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public void onReconnected() { startFut = new GridFutureAdapter<>(); - - long topVer0 = cctx.kernalContext().discovery().topologyVersion(); - - assert topVer0 > 0 : topVer0; - - topVer.set(topVer0); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d4687d9f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 8f6c8a9..850cc24 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -1828,36 +1828,42 @@ class ClientImpl extends TcpDiscoveryImpl { return; } - if (!getLocalNodeId().equals(msg.creatorNodeId())) { - TcpDiscoveryNode node = rmtNodes.remove(msg.failedNodeId()); + if (nodeAdded()) { + if (!getLocalNodeId().equals(msg.creatorNodeId())) { + TcpDiscoveryNode node = rmtNodes.remove(msg.failedNodeId()); - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Discarding node failed message since node is not found [msg=" + msg + ']'); + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Discarding node failed message since node is not found [msg=" + msg + ']'); - return; - } + return; + } - Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg); + Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg); - if (state != CONNECTED) { - if (log.isDebugEnabled()) - log.debug("Discarding node failed message (join process is not finished): " + msg); + if (state != CONNECTED) { + if (log.isDebugEnabled()) + log.debug("Discarding node failed message (join process is not finished): " + msg); - return; - } + return; + } - if (msg.warning() != null) { - ClusterNode creatorNode = rmtNodes.get(msg.creatorNodeId()); + if (msg.warning() != null) { + ClusterNode creatorNode = rmtNodes.get(msg.creatorNodeId()); - U.warn(log, "Received EVT_NODE_FAILED event with warning [" + - "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : msg.creatorNodeId()) + - ", msg=" + msg.warning() + ']'); - } + U.warn(log, "Received EVT_NODE_FAILED event with warning [" + + "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : msg.creatorNodeId()) + + ", msg=" + msg.warning() + ']'); + } - notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, top); + notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, top); - spi.stats.onNodeFailed(); + spi.stats.onNodeFailed(); + } + } + else { + if (log.isDebugEnabled()) + log.debug("Ignore topology message, local node not added to topology: " + msg); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d4687d9f/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index 5234d6e..ad6c46f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -1088,7 +1088,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac clientMode = true; - final int CLIENTS = 2; + final int CLIENTS = 5; List<Ignite> clients = new ArrayList<>(); @@ -1103,12 +1103,14 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac int nodes = SRV_CNT + CLIENTS; int srvNodes = SRV_CNT; - for (int iter = 0; iter < 3; iter++) { + for (int iter = 0; iter < 5; iter++) { log.info("Iteration: " + iter); reconnectClientNodes(log, clients, grid(0), null); - for (Ignite client : clients) { + final int expNodes = CLIENTS + srvNodes; + + for (final Ignite client : clients) { IgniteCache<Object, Object> cache = client.cache(null); assertNotNull(cache); @@ -1117,6 +1119,14 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac assertEquals(1, cache.get(client.name())); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + ClusterGroup grp = client.cluster().forCacheNodes(null); + + return grp.nodes().size() == expNodes; + } + }, 5000); + ClusterGroup grp = client.cluster().forCacheNodes(null); assertEquals(CLIENTS + srvNodes, grp.nodes().size()); @@ -1127,7 +1137,15 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac } for (int i = 0; i < nodes; i++) { - Ignite ignite = grid(i); + final Ignite ignite = grid(i); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + ClusterGroup grp = ignite.cluster().forCacheNodes(null); + + return grp.nodes().size() == expNodes; + } + }, 5000); ClusterGroup grp = ignite.cluster().forCacheNodes(null); http://git-wip-us.apache.org/repos/asf/ignite/blob/d4687d9f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java index 3e124f3..96a396c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java @@ -121,7 +121,7 @@ public class IgniteCachePutAllRestartTest extends GridCommonAbstractTest { iter++; - if (iter % 10 == 0) + if (iter % 1000 == 0) log.info("Iteration: " + iter); }