Repository: ignite Updated Branches: refs/heads/ignite-zk beada20fa -> 01cd78940
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/01cd7894 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/01cd7894 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/01cd7894 Branch: refs/heads/ignite-zk Commit: 01cd78940f0b8546b360d58ab0dc5a623f3b6fa1 Parents: beada20 Author: sboikov <[email protected]> Authored: Mon Nov 27 17:12:07 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Nov 27 17:12:07 2017 +0300 ---------------------------------------------------------------------- .../communication/tcp/TcpCommunicationSpi.java | 4 + .../zk/internal/ZookeeperDiscoveryImpl.java | 89 +++++++++++++++----- 2 files changed, 70 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/01cd7894/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 41600d5a..662a2b9 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -3101,11 +3101,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati int lastWaitingTimeout = 1; while (client == null) { // Reconnection on handshake timeout. + if (stopping) + throw new IgniteSpiException("Node is stopping."); + if (addr.getAddress().isLoopbackAddress() && addr.getPort() == boundTcpPort) { if (log.isDebugEnabled()) log.debug("Skipping local address [addr=" + addr + ", locAddrs=" + node.attribute(createSpiAttributeName(ATTR_ADDRS)) + ", node=" + node + ']'); + continue; } http://git-wip-us.apache.org/repos/asf/ignite/blob/01cd7894/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index f742ad0..76eb306 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -212,6 +212,8 @@ public class ZookeeperDiscoveryImpl { * @return Ping result. */ public boolean pingNode(UUID nodeId) { + checkState(); + // TODO ZK return node(nodeId) != null; } @@ -1378,7 +1380,7 @@ public class ZookeeperDiscoveryImpl { private void stop0(Throwable e) throws InterruptedException { log.info("Stop ZookeeperDiscovery [nodeId=" + locNode.id() + ", err=" + e + ']'); - connState = ConnectionState.DISCONNECTED; + connState = ConnectionState.STOPPED; ZookeeperClient zkClient = state.zkClient; @@ -1454,6 +1456,9 @@ public class ZookeeperDiscoveryImpl { */ private class ZkEventWorker extends IgniteSpiThread { /** */ + private final Runnable RECONNECT = new Runnable() {@Override public void run() {}}; + + /** */ private final Runnable CONNECTION_LOST = new Runnable() {@Override public void run() {}}; /** */ @@ -1475,6 +1480,8 @@ public class ZookeeperDiscoveryImpl { while (!isInterrupted()) { Runnable r = evtsQ.take(); + if (r == RECONNECT) + processReconnect(); if (r == CONNECTION_LOST) processConnectionLost(); else { @@ -1501,6 +1508,35 @@ public class ZookeeperDiscoveryImpl { /** * */ + void processReconnect() { + assert locNode.isClient() : locNode; + + if (connState == ConnectionState.DISCONNECTED) + return; + + connState = ConnectionState.DISCONNECTED; + + state.zkClient.onCloseStart(); + + busyLock.block(); + + busyLock.unblock(); + + state.zkClient.close(); + + UUID newId = UUID.randomUUID(); + + U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due to network problems [" + + "newId=" + newId + + ", prevId=" + locNode.id() + + ", locNode=" + locNode + ']'); + + reconnect(newId); + } + + /** + * + */ void processConnectionLost() { if (clientReconnectEnabled) { connState = ConnectionState.DISCONNECTED; @@ -1516,41 +1552,48 @@ public class ZookeeperDiscoveryImpl { ", prevId=" + locNode.id() + ", locNode=" + locNode + ']'); - locNode.onClientDisconnected(newId); + reconnect(newId); + } + else { + U.warn(log, "Connection to Zookeeper server is lost, local node SEGMENTED."); - if (state.joined) { - assert state.evtsData != null; + onSegmented(new IgniteSpiException("Zookeeper connection loss.")); + } + } - lsnr.onDiscovery(EVT_CLIENT_NODE_DISCONNECTED, - state.evtsData.topVer, - locNode, - state.top.topologySnapshot(), - Collections.<Long, Collection<ClusterNode>>emptyMap(), - null); - } + /** + * @param newId New ID. + */ + private void reconnect(UUID newId) { + locNode.onClientDisconnected(newId); - state = new ZkRuntimeState(state.joined); + if (state.joined) { + assert state.evtsData != null; - try { - joinTopology0(true); - } - catch (Exception e) { - U.error(log, "Failed to reconnect: " + e, e); + lsnr.onDiscovery(EVT_CLIENT_NODE_DISCONNECTED, + state.evtsData.topVer, + locNode, + state.top.topologySnapshot(), + Collections.<Long, Collection<ClusterNode>>emptyMap(), + null); + } - onSegemented(e); - } + state = new ZkRuntimeState(state.joined); + + try { + joinTopology0(true); } - else { - U.warn(log, "Connection to Zookeeper server is lost, local node SEGMENTED."); + catch (Exception e) { + U.error(log, "Failed to reconnect: " + e, e); - onSegemented(new IgniteSpiException("Zookeeper connection loss.")); + onSegmented(e); } } /** * @param e Error. */ - private void onSegemented(Exception e) { + private void onSegmented(Exception e) { if (state.joined) { assert state.evtsData != null;
