Merge branch 'ignite-1.7.9-p1' # Conflicts: # modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java # modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java # modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java # modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/64c71059 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/64c71059 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/64c71059 Branch: refs/heads/ignite-3477-master Commit: 64c71059bda1ce04413afc36e2d1328e82baaf5d Parents: 117e18e d124004 Author: dkarachentsev <[email protected]> Authored: Wed Mar 22 16:57:11 2017 +0300 Committer: dkarachentsev <[email protected]> Committed: Wed Mar 22 16:57:11 2017 +0300 ---------------------------------------------------------------------- .../ignite/internal/GridKernalGatewayImpl.java | 8 +- .../apache/ignite/internal/IgniteKernal.java | 120 +++++- .../internal/IgniteNeedReconnectException.java | 40 ++ .../discovery/GridDiscoveryManager.java | 24 ++ .../GridCachePartitionExchangeManager.java | 25 +- .../dht/GridDhtAssignmentFetchFuture.java | 13 +- .../GridDhtPartitionsExchangeFuture.java | 46 ++- .../service/GridServiceProcessor.java | 101 ++--- .../ignite/spi/discovery/tcp/ClientImpl.java | 203 ++++++++-- .../ignite/spi/discovery/tcp/ServerImpl.java | 5 + .../spi/discovery/tcp/TcpDiscoveryImpl.java | 8 + .../spi/discovery/tcp/TcpDiscoverySpi.java | 9 + .../IgniteClientReconnectCacheTest.java | 7 +- .../ignite/internal/IgniteClientRejoinTest.java | 378 +++++++++++++++++++ .../GridServiceProcessorStopSelfTest.java | 75 ++++ .../tcp/TcpClientDiscoverySpiSelfTest.java | 48 ++- .../IgniteClientReconnectTestSuite.java | 2 + 17 files changed, 1013 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64c71059/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64c71059/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 2a6706e,25f7884..0ea6ea4 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@@ -3620,25 -3576,45 +3676,65 @@@ public class IgniteKernal implements Ig } /** + * @param node Node. + * @param payload Message payload. + * @param procFromNioThread If {@code true} message is processed from NIO thread. + * @return Response future. + */ + public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean procFromNioThread) { + return ctx.io().sendIoTest(node, payload, procFromNioThread); + } + + /** + * @param nodes Nodes. + * @param payload Message payload. + * @param procFromNioThread If {@code true} message is processed from NIO thread. + * @return Response future. + */ + public IgniteInternalFuture sendIoTest(List<ClusterNode> nodes, byte[] payload, boolean procFromNioThread) { + return ctx.io().sendIoTest(nodes, payload, procFromNioThread); + } + ++ /** + * + */ + private class ReconnectState { + /** */ + private final GridFutureAdapter firstReconnectFut = new GridFutureAdapter(); + + /** */ + private GridCompoundFuture<?, Object> curReconnectFut; + + /** */ + private GridFutureAdapter<?> reconnectDone; + + /** + * @throws IgniteCheckedException If failed. + */ + void waitFirstReconnect() throws IgniteCheckedException { + firstReconnectFut.get(); + } + + /** + * + */ + void waitPreviousReconnect() { + if (curReconnectFut != null && !curReconnectFut.isDone()) { + assert reconnectDone != null; + + curReconnectFut.onDone(STOP_RECONNECT); + + try { + reconnectDone.get(); + } + catch (IgniteCheckedException ignote) { + // No-op. + } + } + + } + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteKernal.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/64c71059/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index d637de4,2ec1070..b2c4ced --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@@ -108,8 -112,8 +108,9 @@@ import org.apache.ignite.spi.discovery. import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport; + import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.thread.IgniteThread; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@@ -1903,114 -1892,29 +1904,137 @@@ public class GridDiscoveryManager exten } /** + * @return {@code True} if local node client and discovery SPI supports reconnect. + */ + public boolean reconnectSupported() { + DiscoverySpi spi = getSpi(); + + return ctx.clientNode() && (spi instanceof TcpDiscoverySpi) && + !(((TcpDiscoverySpi) spi).isClientReconnectDisabled()); + } + + /** + * Leave cluster and try to join again. + * + * @throws IgniteSpiException If failed. + */ + public void reconnect() { + assert reconnectSupported(); + + DiscoverySpi discoverySpi = getSpi(); + + ((TcpDiscoverySpi)discoverySpi).reconnect(); + } + + /** + * @param loc Local node. + * @param topSnapshot Topology snapshot. + * @return Newly created discovery cache. + */ + @NotNull private DiscoCache createDiscoCache(ClusterNode loc, Collection<ClusterNode> topSnapshot) { + HashSet<UUID> alives = U.newHashSet(topSnapshot.size()); + HashMap<UUID, ClusterNode> nodeMap = U.newHashMap(topSnapshot.size()); + + ArrayList<ClusterNode> daemonNodes = new ArrayList<>(topSnapshot.size()); + ArrayList<ClusterNode> srvNodes = new ArrayList<>(topSnapshot.size()); + ArrayList<ClusterNode> rmtNodes = new ArrayList<>(topSnapshot.size()); + ArrayList<ClusterNode> allNodes = new ArrayList<>(topSnapshot.size()); + + for (ClusterNode node : topSnapshot) { + if (alive(node)) + alives.add(node.id()); + + if (node.isDaemon()) + daemonNodes.add(node); + else { + allNodes.add(node); + + if (!node.isLocal()) + rmtNodes.add(node); + + if (!CU.clientNode(node)) + srvNodes.add(node); + } + + nodeMap.put(node.id(), node); + } + + assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" + + " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']'; + + Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size()); + Map<Integer, List<ClusterNode>> affCacheNodes = U.newHashMap(allNodes.size()); + + Set<ClusterNode> allNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); + Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); + Set<ClusterNode> srvNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); + + Set<Integer> nearEnabledCaches = new HashSet<>(); + + for (ClusterNode node : allNodes) { + assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']'; + assert !node.isDaemon(); + + for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) { + String cacheName = entry.getKey(); + CachePredicate filter = entry.getValue(); + + if (filter.cacheNode(node)) { + allNodesWithCaches.add(node); + + if(!CU.clientNode(node)) + srvNodesWithCaches.add(node); + + if (!node.isLocal()) + rmtNodesWithCaches.add(node); + + addToMap(allCacheNodes, cacheName, node); + + if (filter.dataNode(node)) + addToMap(affCacheNodes, cacheName, node); + + if (filter.nearNode(node)) + nearEnabledCaches.add(CU.cacheId(cacheName)); + } + } + } + + return new DiscoCache( + loc, + Collections.unmodifiableList(rmtNodes), + Collections.unmodifiableList(allNodes), + Collections.unmodifiableList(srvNodes), + Collections.unmodifiableList(daemonNodes), + U.sealList(srvNodesWithCaches), + U.sealList(allNodesWithCaches), + U.sealList(rmtNodesWithCaches), + Collections.unmodifiableMap(allCacheNodes), + Collections.unmodifiableMap(affCacheNodes), + Collections.unmodifiableMap(nodeMap), + Collections.unmodifiableSet(nearEnabledCaches), + alives); + } + + /** + * Adds node to map. + * + * @param cacheMap Map to add to. + * @param cacheName Cache name. + * @param rich Node to add + */ + private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) { + List<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName)); + + if (cacheNodes == null) { + cacheNodes = new ArrayList<>(); + + cacheMap.put(CU.cacheId(cacheName), cacheNodes); + } + + cacheNodes.add(rich); + } + + /** * Updates topology version if current version is smaller than updated. * * @param updated Updated topology version. http://git-wip-us.apache.org/repos/asf/ignite/blob/64c71059/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64c71059/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64c71059/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 50937a8,d4f95e5..5eacc36 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@@ -1677,12 -1677,16 +1704,21 @@@ public class GridDhtPartitionsExchangeF } } + /** + * @param e Exception. + * @return {@code True} if local node should try reconnect in case of error. + */ + public boolean reconnectOnError(Throwable e) { + return X.hasCause(e, IOException.class, IgniteClientDisconnectedCheckedException.class) && + cctx.discovery().reconnectSupported(); + } + /** {@inheritDoc} */ + @Override public boolean isExchange() { + return true; + } + + /** {@inheritDoc} */ @Override public int compareTo(GridDhtPartitionsExchangeFuture fut) { return exchId.compareTo(fut.exchId); } http://git-wip-us.apache.org/repos/asf/ignite/blob/64c71059/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 84fb8e3,bd81518..e0a5c7c --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@@ -317,8 -315,10 +317,10 @@@ public class GridServiceProcessor exten busyLock.block(); + U.shutdownNow(GridServiceProcessor.class, depExe, log); + if (!ctx.clientNode()) - ctx.event().removeLocalEventListener(topLsnr); + ctx.event().removeDiscoveryEventListener(topLsnr); Collection<ServiceContextImpl> ctxs = new ArrayList<>(); @@@ -1401,7 -1399,7 +1401,7 @@@ return; try { - depExe.execute(new BusyRunnable() { - depExe.submit(new DepRunnable() { ++ depExe.execute(new DepRunnable() { @Override public void run0() { onSystemCacheUpdated(deps); } @@@ -1582,18 -1582,13 +1582,18 @@@ } else return; + + topVer = ((DiscoveryCustomEvent)evt).affinityTopologyVersion(); } else - topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0); + topVer = new AffinityTopologyVersion((evt).topologyVersion(), 0); - depExe.execute(new BusyRunnable() { - depExe.submit(new DepRunnable() { ++ depExe.execute(new DepRunnable() { @Override public void run0() { - ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer); + // In case the cache instance isn't tracked by DiscoveryManager anymore. + discoCache.updateAlives(ctx.discovery()); + + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); if (oldest != null && oldest.isLocal()) { final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/64c71059/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 04b076d,02ba56a..feb3e48 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@@ -1100,6 -1183,15 +1186,16 @@@ class ClientImpl extends TcpDiscoveryIm continue; } + else { - msg = queue.poll(); ++ if (msg == null) ++ msg = queue.poll(); + + if (msg == null) { + mux.wait(); + + continue; + } + } } for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sndMsgLsnrs) http://git-wip-us.apache.org/repos/asf/ignite/blob/64c71059/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64c71059/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64c71059/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64c71059/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index 682d2d7,6cdf465..01aa256 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@@ -698,11 -698,14 +698,14 @@@ public class IgniteClientReconnectCache IgniteInternalFuture<Boolean> fut = GridTestUtils.runAsync(new Callable<Boolean>() { @Override public Boolean call() throws Exception { try { - Ignition.start(optimize(getConfiguration(getTestGridName(SRV_CNT)))); + Ignition.start(optimize(getConfiguration(getTestIgniteInstanceName(SRV_CNT)))); - fail(); + // Commented due to IGNITE-4473, because + // IgniteClientDisconnectedException won't + // be thrown, but client will reconnect. + // fail(); - return false; + return true; } catch (IgniteClientDisconnectedException e) { log.info("Expected start error: " + e); http://git-wip-us.apache.org/repos/asf/ignite/blob/64c71059/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64c71059/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ----------------------------------------------------------------------
