Repository: ignite Updated Branches: refs/heads/ignite-zk 646be0a51 -> f6aae4d96
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f6aae4d9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f6aae4d9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f6aae4d9 Branch: refs/heads/ignite-zk Commit: f6aae4d96ea418291d264744399800baa9109f68 Parents: 646be0a Author: sboikov <[email protected]> Authored: Thu Dec 21 11:03:39 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Dec 21 12:09:01 2017 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtCacheAdapter.java | 4 + .../datastreamer/DataStreamerImpl.java | 3 + .../zk/internal/ZkAbstractCallabck.java | 19 ++++- .../ZkCommunicationErrorProcessFuture.java | 4 + .../discovery/zk/internal/ZkRuntimeState.java | 10 ++- .../zk/internal/ZookeeperDiscoveryImpl.java | 86 +++++++++++++------- ...niteClientReconnectFailoverAbstractTest.java | 2 +- 7 files changed, 92 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f6aae4d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 28f9c76..ee0cb01 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -966,6 +966,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap try { ctx.io().send(nodeId, res, ctx.ioPolicy()); } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send get response to node, node failed: " + nodeId); + } catch (IgniteCheckedException e) { U.error(log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId + ",req=" + req + ", res=" + res + ']', e); http://git-wip-us.apache.org/repos/asf/ignite/blob/f6aae4d9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 12eb2dc..da07e91 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -1146,6 +1146,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed if (doneCnt == activeFuts0.size()) return; + + if (disconnectErr != null) + throw disconnectErr; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f6aae4d9/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java index d2efb9f..b80a9dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java @@ -47,7 +47,24 @@ abstract class ZkAbstractCallabck { * @return {@code True} if is able to start processing. */ final boolean onProcessStart() { - return !rtState.closing && busyLock.enterBusy(); + boolean start = rtState.errForClose == null && busyLock.enterBusy(); + + if (!start) { + assert rtState.errForClose != null; + + onStartFailed(); + + return false; + } + + return true; + } + + /** + * + */ + void onStartFailed() { + // No-op. } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/f6aae4d9/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java index e64c801..accda6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java @@ -175,6 +175,10 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen impl.localNode().order(), impl.marshalZip(state)); } + + @Override void onStartFailed() { + onError(rtState.errForClose); + } }); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f6aae4d9/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java index dc7b1bb..9c000b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java @@ -34,7 +34,7 @@ class ZkRuntimeState { ZkAliveNodeDataWatcher aliveNodeDataWatcher; /** */ - volatile boolean closing; + volatile Exception errForClose; /** */ final boolean prevJoined; @@ -95,10 +95,12 @@ class ZkRuntimeState { } /** - * + * @param err Error. */ - void onCloseStart() { - closing = true; + void onCloseStart(Exception err) { + assert err != null; + + errForClose = err; ZookeeperClient zkClient = this.zkClient; http://git-wip-us.apache.org/repos/asf/ignite/blob/f6aae4d9/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index f79e3f5..f8fe421 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -398,14 +398,15 @@ public class ZookeeperDiscoveryImpl { assert clientReconnectEnabled; synchronized (stateMux) { - if (connState == ConnectionState.STARTED) + if (connState == ConnectionState.STARTED) { connState = ConnectionState.DISCONNECTED; + + rtState.onCloseStart(disconnectError()); + } else return; } - rtState.onCloseStart(); - busyLock.block(); busyLock.unblock(); @@ -454,12 +455,14 @@ public class ZookeeperDiscoveryImpl { * @param e Error. */ private void onSegmented(Exception e) { + rtState.errForClose = e; + if (rtState.joined) { synchronized (stateMux) { connState = ConnectionState.STOPPED; } - rtState.zkClient.zk().sync(zkPaths.clusterDir, new SegmentedWatcher(), null); + notifySegmented(); } else joinFut.onDone(e); @@ -468,15 +471,6 @@ public class ZookeeperDiscoveryImpl { /** * */ - class SegmentedWatcher implements AsyncCallback.VoidCallback { - @Override public void processResult(int rc, String path, Object ctx) { - notifySegmented(); - } - } - - /** - * - */ private void notifySegmented() { assert rtState.evtsData != null; @@ -514,6 +508,13 @@ public class ZookeeperDiscoveryImpl { } /** + * @return Exception. + */ + static IgniteClientDisconnectedCheckedException disconnectError() { + return new IgniteClientDisconnectedCheckedException(null, "Client node disconnected."); + } + + /** * @param nodeId Node ID. * @return {@code True} if node joined or joining topology. */ @@ -1096,7 +1097,7 @@ public class ZookeeperDiscoveryImpl { /** {@inheritDoc} */ @Override public void process0(WatchedEvent evt) { - if (rtState.closing || rtState.joined) + if (rtState.errForClose != null || rtState.joined) return; if (evt.getType() == Event.EventType.NodeDataChanged) @@ -1258,7 +1259,7 @@ public class ZookeeperDiscoveryImpl { catch (Exception e) { U.warn(log, "Local node authentication failed: " + e, e); - rtState.onCloseStart(); + rtState.onCloseStart(e); joinFut.onDone(e); @@ -1809,8 +1810,10 @@ public class ZookeeperDiscoveryImpl { final List<ClusterNode> topSnapshot = Collections.singletonList((ClusterNode)locNode); - if (connState == ConnectionState.DISCONNECTED) - connState = ConnectionState.STARTED; + synchronized (stateMux) { + if (connState == ConnectionState.DISCONNECTED) + connState = ConnectionState.STARTED; + } lsnr.onDiscovery(EventType.EVT_NODE_JOINED, 1L, @@ -2294,8 +2297,10 @@ public class ZookeeperDiscoveryImpl { final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); - if (connState == ConnectionState.DISCONNECTED) - connState = ConnectionState.STARTED; + synchronized (stateMux) { + if (connState == ConnectionState.DISCONNECTED) + connState = ConnectionState.STARTED; + } lsnr.onDiscovery(evtData.eventType(), evtData.topologyVersion(), @@ -2723,7 +2728,7 @@ public class ZookeeperDiscoveryImpl { client.deleteIfExistsAsync(zkPaths.aliveNodesDir); - rtState.onCloseStart(); + rtState.onCloseStart(new IgniteCheckedException("Simulate node failure error.")); rtState.zkClient.close(); } @@ -2815,8 +2820,6 @@ public class ZookeeperDiscoveryImpl { private Exception localNodeFail(String msg, boolean clientReconnect) { U.warn(log, msg); - rtState.onCloseStart(); - if (clientReconnect && clientReconnectEnabled) { assert locNode.isClient() : locNode; @@ -2827,6 +2830,8 @@ public class ZookeeperDiscoveryImpl { reconnect = true; connState = ConnectionState.DISCONNECTED; + + rtState.onCloseStart(disconnectError()); } } @@ -2841,8 +2846,11 @@ public class ZookeeperDiscoveryImpl { runInWorkerThread(new ReconnectClosure(newId)); } } - else + else { + rtState.errForClose = new IgniteCheckedException(msg); + notifySegmented(); + } // Stop any further processing. return new ZookeeperClientFailedException(msg); @@ -3102,11 +3110,13 @@ public class ZookeeperDiscoveryImpl { if (!stop.compareAndSet(false, true)) return; + IgniteCheckedException err = new IgniteCheckedException("Node stopped."); + synchronized (stateMux) { connState = ConnectionState.STOPPED; - } - rtState.onCloseStart(); + rtState.onCloseStart(err); + } busyLock.block(); @@ -3119,7 +3129,7 @@ public class ZookeeperDiscoveryImpl { if (zkClient != null) zkClient.close(); - finishFutures(new IgniteCheckedException("Node stopped.")); + finishFutures(err); IgniteUtils.shutdownNow(ZookeeperDiscoveryImpl.class, utilityPool, log); } @@ -3217,9 +3227,7 @@ public class ZookeeperDiscoveryImpl { /** {@inheritDoc} */ @Override public void run() { - finishFutures(new IgniteClientDisconnectedCheckedException(null, "Client node disconnected.")); - - rtState.closing = true; + finishFutures(disconnectError()); busyLock.block(); @@ -3240,8 +3248,11 @@ public class ZookeeperDiscoveryImpl { @Override public void run() { if (clientReconnectEnabled) { synchronized (stateMux) { - if (connState == ConnectionState.STARTED) + if (connState == ConnectionState.STARTED) { connState = ConnectionState.DISCONNECTED; + + rtState.onCloseStart(disconnectError()); + } else return; } @@ -3533,6 +3544,10 @@ public class ZookeeperDiscoveryImpl { } } } + + @Override void onStartFailed() { + onDone(rtState.errForClose); + } }); } } @@ -3552,6 +3567,17 @@ public class ZookeeperDiscoveryImpl { * @return {@code False} if future was completed. */ boolean checkNodeAndState() { + if (isDone()) + return false; + + Exception err = rtState.errForClose; + + if (err != null) { + onDone(err); + + return false; + } + ConnectionState connState = ZookeeperDiscoveryImpl.this.connState; if (connState == ConnectionState.DISCONNECTED) { http://git-wip-us.apache.org/repos/asf/ignite/blob/f6aae4d9/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java index 676a641..fa8670c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java @@ -210,7 +210,7 @@ public abstract class IgniteClientReconnectFailoverAbstractTest extends IgniteCl } if (err != null) { - log.error(err); + log.error("Test error:" + err); U.dumpThreads(log);
