zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1ccbac03 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1ccbac03 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1ccbac03 Branch: refs/heads/ignite-zk Commit: 1ccbac03cfa94123adc9be7270a76311415b7389 Parents: 96aa846 Author: sboikov <[email protected]> Authored: Mon Nov 27 11:35:13 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Nov 27 16:27:45 2017 +0300 ---------------------------------------------------------------------- .../processors/cluster/ClusterNodeMetrics.java | 7 +- .../processors/cluster/ClusterProcessor.java | 9 +- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 23 +- .../discovery/zk/internal/ZkClusterNodes.java | 8 + .../discovery/zk/internal/ZkIgnitePaths.java | 14 +- .../discovery/zk/internal/ZkRuntimeState.java | 60 ++ .../discovery/zk/internal/ZookeeperClient.java | 50 +- .../ZookeeperClientFailedException.java | 7 + .../zk/internal/ZookeeperClusterNode.java | 15 +- .../zk/internal/ZookeeperDiscoveryImpl.java | 999 ++++++++++++------- .../internal/ClusterNodeMetricsUpdateTest.java | 98 +- .../ZookeeperDiscoverySpiBasicTest.java | 143 ++- .../testframework/junits/GridAbstractTest.java | 8 + .../testsuites/IgniteComputeGridTestSuite.java | 2 + 14 files changed, 1041 insertions(+), 402 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java index 4a7dd77..75a83a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java @@ -22,13 +22,14 @@ import java.util.Collections; import java.util.Map; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cluster.ClusterMetrics; +import org.apache.ignite.internal.ClusterMetricsSnapshot; /** * */ class ClusterNodeMetrics implements Serializable { /** */ - private final ClusterMetrics metrics; + private final byte[] metrics; /** */ private final Map<Integer, CacheMetrics> cacheMetrics; @@ -38,14 +39,14 @@ class ClusterNodeMetrics implements Serializable { * @param cacheMetrics Cache metrics. */ ClusterNodeMetrics(ClusterMetrics metrics, Map<Integer, CacheMetrics> cacheMetrics) { - this.metrics = metrics; + this.metrics = ClusterMetricsSnapshot.serialize(metrics); this.cacheMetrics = cacheMetrics; } /** * @return Metrics. */ - ClusterMetrics metrics() { + byte[] metrics() { return metrics; } http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java index 8812161..36c4c0f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java @@ -33,6 +33,7 @@ import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; +import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteDiagnosticInfo; @@ -387,6 +388,7 @@ public class ClusterProcessor extends GridProcessorAdapter { } /** + * @param sndNodeId Sender node ID. * @param msg Message. */ private void processMetricsUpdateMessage(UUID sndNodeId, ClusterMetricsUpdateMessage msg) { @@ -413,6 +415,11 @@ public class ClusterProcessor extends GridProcessorAdapter { } } + /** + * @param discoCache Discovery data cache. + * @param nodeId Node ID. + * @param metricsBytes Marshalled metrics. + */ private void updateNodeMetrics(DiscoCache discoCache, UUID nodeId, byte[] metricsBytes) { ClusterNode node = discoCache.node(nodeId); @@ -426,7 +433,7 @@ public class ClusterProcessor extends GridProcessorAdapter { IgniteClusterNode node0 = (IgniteClusterNode)node; - node0.setMetrics(metrics.metrics()); + node0.setMetrics(ClusterMetricsSnapshot.deserialize(metrics.metrics(), 0)); node0.setCacheMetrics(metrics.cacheMetrics()); ctx.discovery().metricsUpdateEvent(discoCache, node0); http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index 3c3ffa7..e4450d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -283,15 +283,20 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery ", basePath=" + basePath + ", clusterName=" + clusterName + ']'); - impl = new ZookeeperDiscoveryImpl(log, + impl = new ZookeeperDiscoveryImpl( + igniteInstanceName, + zkConnectionString, + sesTimeout, + log, basePath, clusterName, locNode, lsnr, - exchange); + exchange, + locNode.isClient() && !clientReconnectDisabled); try { - impl.joinTopology(igniteInstanceName, zkConnectionString, sesTimeout); + impl.joinTopology(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -302,8 +307,16 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery /** {@inheritDoc} */ @Override public void spiStop() throws IgniteSpiException { - if (impl != null) - impl.stop(); + if (impl != null) { + try { + impl.stop(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteSpiException(e); + } + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java index 7ca1360..4c114a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java @@ -53,6 +53,14 @@ public class ZkClusterNodes { } /** + * @return Current nodes in topology. + */ + @SuppressWarnings("unchecked") + List<ClusterNode> topologySnapshot() { + return new ArrayList<>((Collection)nodesByOrder.values()); + } + + /** * @param node New node. */ void addNode(ZookeeperClusterNode node) { http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java index a98ea8d..cd2ff0b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java @@ -175,6 +175,16 @@ class ZkIgnitePaths { return basePath + "/" + clusterName + "/" + path; } + String joiningNodeDataPath(UUID nodeId, String aliveNodePath) { + int joinSeq = ZkIgnitePaths.aliveJoinDataSequence(aliveNodePath); + + return joinDataDir + '/' + + ZkIgnitePaths.aliveNodePrefixId(aliveNodePath) + ":" + + nodeId.toString() + + "|" + + String.format("%010d", joinSeq); + } + /** * @param path Alive node zk path. * @return Node internal ID. @@ -210,9 +220,9 @@ class ZkIgnitePaths { * @param path Alive node zk path. * @return Joined node sequence. */ - static int aliveJoinDataSequence(String path) { - int idx1 = path.indexOf('|'); + private static int aliveJoinDataSequence(String path) { int idx2 = path.lastIndexOf('|'); + int idx1 = path.lastIndexOf('|', idx2 - 1); return Integer.parseInt(path.substring(idx1 + 1, idx2)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java new file mode 100644 index 0000000..d2d0372 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +/** + * + */ +class ZkRuntimeState { + /** */ + final boolean prevJoined; + + /** */ + ZookeeperClient zkClient; + + /** */ + long gridStartTime; + + /** */ + boolean joined; + + /** */ + ZkDiscoveryEventsData evtsData; + + /** */ + boolean crd; + + /** */ + String locNodeZkPath; + + /** */ + ZkAliveNodeData locNodeInfo = new ZkAliveNodeData(); + + /** */ + int procEvtCnt; + + /** */ + final ZkClusterNodes top = new ZkClusterNodes(); + + /** + * @param prevJoined {@code True} if joined topology before reconnect attempt. + */ + ZkRuntimeState(boolean prevJoined) { + this.prevJoined = prevJoined; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java index fb6a697..0f39da6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java @@ -77,6 +77,9 @@ public class ZookeeperClient implements Watcher { /** */ private final ArrayDeque<ZkAsyncOperation> retryQ = new ArrayDeque<>(); + /** */ + private volatile boolean closing; + /** * @param log Logger. * @param connectString ZK connection string. @@ -122,13 +125,16 @@ public class ZookeeperClient implements Watcher { Thread.currentThread().setName(threadName); } - connTimer = new Timer("zk-timer-" + igniteInstanceName); + connTimer = new Timer("zk-client-timer-" + igniteInstanceName); scheduleConnectionCheck(); } /** {@inheritDoc} */ @Override public void process(WatchedEvent evt) { + if (closing) + return; + if (evt.getType() == Event.EventType.None) { ConnectionState newState; @@ -179,18 +185,18 @@ public class ZookeeperClient implements Watcher { } else if (newState == ConnectionState.Connected) stateMux.notifyAll(); - else { + else assert state == ConnectionState.Lost : state; - - closeClient(); - } } else return; } - if (newState == ConnectionState.Lost) + if (newState == ConnectionState.Lost) { + closeClient(); + notifyConnectionLost(); + } else if (newState == ConnectionState.Connected) { for (ZkAsyncOperation op : retryQ) op.execute(); @@ -557,6 +563,17 @@ public class ZookeeperClient implements Watcher { /** * */ + void onCloseStart() { + closing = true; + + synchronized (stateMux) { + stateMux.notifyAll(); + } + } + + /** + * + */ public void close() { closeClient(); } @@ -573,6 +590,9 @@ public class ZookeeperClient implements Watcher { ZookeeperClientFailedException err = null; synchronized (stateMux) { + if (closing) + throw new ZookeeperClientFailedException("Zookeeper client is closed."); + U.warn(log, "Failed to execute zookeeper operation [err=" + e + ", state=" + state + ']'); if (zk.getState() == ZooKeeper.States.CLOSED) @@ -609,8 +629,6 @@ public class ZookeeperClient implements Watcher { U.warn(log, "Failed to establish zookeeper connection, close client " + "[timeout=" + connLossTimeout + ']'); - closeClient(); - err = new ZookeeperClientFailedException(e); } } @@ -623,20 +641,23 @@ public class ZookeeperClient implements Watcher { ", remainingWaitTime=" + remainingTime + ']'); stateMux.wait(RETRY_TIMEOUT); + + if (closing) + throw new ZookeeperClientFailedException("Zookeeper client is closed."); } } else { U.error(log, "Operation failed with unexpected error, close client: " + e, e); - closeClient(); - state = ConnectionState.Lost; - throw new ZookeeperClientFailedException(e); + err = new ZookeeperClientFailedException(e); } } if (err != null) { + closeClient(); + notifyConnectionLost(); throw err; @@ -955,13 +976,14 @@ public class ZookeeperClient implements Watcher { "[timeout=" + connLossTimeout + ']'); connLoss = true; - - closeClient(); } } - if (connLoss) + if (connLoss) { + closeClient(); + notifyConnectionLost(); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java index a61f478..01d011b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java @@ -25,6 +25,13 @@ class ZookeeperClientFailedException extends Exception { private static final long serialVersionUID = 0L; /** + * @param msg Message. + */ + ZookeeperClientFailedException(String msg) { + super(msg); + } + + /** * @param cause Cause. */ ZookeeperClientFailedException(Throwable cause) { http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java index 855d7cc..aa90c67 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java @@ -168,9 +168,6 @@ public class ZookeeperClusterNode implements IgniteClusterNode, Serializable { return metrics0; } - if (metrics == null) - System.out.println(); - return metrics; } @@ -227,7 +224,7 @@ public class ZookeeperClusterNode implements IgniteClusterNode, Serializable { /** * @return Internal ID corresponds to Zookeeper sequential node. */ - public int internalId() { + int internalId() { return internalId; } @@ -238,12 +235,22 @@ public class ZookeeperClusterNode implements IgniteClusterNode, Serializable { this.internalId = internalId; } + /** + * @param order Node order. + */ void order(long order) { assert order > 0 : order; this.order = order; } + /** + * @param newId New node ID. + */ + public void onClientDisconnected(UUID newId) { + id = newId; + } + /** {@inheritDoc} */ @Override public IgniteProductVersion version() { return ver; http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index c7b9224..a08e879 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -27,8 +27,11 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; @@ -39,12 +42,14 @@ import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.events.DiscoveryCustomEvent; +import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiThread; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange; @@ -57,6 +62,8 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; +import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED; import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL; import static org.apache.zookeeper.CreateMode.PERSISTENT; @@ -68,6 +75,15 @@ public class ZookeeperDiscoveryImpl { static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD = "IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD"; /** */ + private final String igniteInstanceName; + + /** */ + private final String connectString; + + /** */ + private final int sesTimeout; + + /** */ private final JdkMarshaller marsh = new JdkMarshaller(); /** */ @@ -77,6 +93,9 @@ public class ZookeeperDiscoveryImpl { private final IgniteLogger log; /** */ + private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + + /** */ private final ZookeeperClusterNode locNode; /** */ @@ -86,7 +105,7 @@ public class ZookeeperDiscoveryImpl { private final DiscoverySpiDataExchange exchange; /** */ - private ZookeeperClient zkClient; + private final boolean clientReconnectEnabled; /** */ private final GridFutureAdapter<Void> joinFut = new GridFutureAdapter<>(); @@ -104,31 +123,16 @@ public class ZookeeperDiscoveryImpl { private final ZkDataCallback dataCallback; /** */ - private final ZkClusterNodes top = new ZkClusterNodes(); - - /** */ - private long gridStartTime; - - /** */ - private boolean joined; - - /** */ - private ZkDiscoveryEventsData evtsData; - - /** */ - private boolean crd; + private final int evtsAckThreshold; /** */ - private String locNodeZkPath; + private ZkRuntimeState state = new ZkRuntimeState(false); /** */ - private ZkAliveNodeData locNodeInfo = new ZkAliveNodeData(); + private volatile ConnectionState connState = ConnectionState.STARTED; /** */ - private final int evtsAckThreshold; - - /** */ - private int procEvtCnt; + private ZkEventWorker evtWorker; /** * @param log Logger. @@ -138,12 +142,17 @@ public class ZookeeperDiscoveryImpl { * @param lsnr Discovery events listener. * @param exchange Discovery data exchange. */ - public ZookeeperDiscoveryImpl(IgniteLogger log, + public ZookeeperDiscoveryImpl( + String igniteInstanceName, + String connectString, + int sesTimeout, + IgniteLogger log, String basePath, String clusterName, ZookeeperClusterNode locNode, DiscoverySpiListener lsnr, - DiscoverySpiDataExchange exchange) { + DiscoverySpiDataExchange exchange, + boolean clientReconnectEnabled) { assert locNode.id() != null && locNode.isLocal() : locNode; if (F.isEmpty(clusterName)) @@ -153,10 +162,14 @@ public class ZookeeperDiscoveryImpl { zkPaths = new ZkIgnitePaths(basePath, clusterName); + this.igniteInstanceName = igniteInstanceName; + this.connectString = connectString; + this.sesTimeout = sesTimeout; this.log = log.getLogger(getClass()); this.locNode = locNode; this.lsnr = lsnr; this.exchange = exchange; + this.clientReconnectEnabled = clientReconnectEnabled; watcher = new ZkWatcher(); childrenCallback = new ZKChildrenCallback(); @@ -191,7 +204,7 @@ public class ZookeeperDiscoveryImpl { @Nullable public ClusterNode node(UUID nodeId) { assert nodeId != null; - return top.nodesById.get(nodeId); + return state.top.nodesById.get(nodeId); } /** @@ -207,7 +220,25 @@ public class ZookeeperDiscoveryImpl { * @return Remote nodes. */ public Collection<ClusterNode> remoteNodes() { - return top.remoteNodes(); + checkState(); + + return state.top.remoteNodes(); + } + + /** + * + */ + private void checkState() { + switch (connState) { + case STARTED: + break; + + case STOPPED: + throw new IgniteSpiException("Zookeeper client closed."); + + case DISCONNECTED: + throw new IgniteClientDisconnectedException(null, "Client is disconnected."); + } } /** @@ -215,8 +246,16 @@ public class ZookeeperDiscoveryImpl { * @return {@code True} if node joined or joining topology. */ public boolean knownNode(UUID nodeId) { + checkState(); + + if (!busyLock.enterBusy()) { + checkState(); + + throw new IgniteSpiException("Zookeeper client closed."); + } + try { - List<String> children = zkClient.getChildren(zkPaths.aliveNodesDir); + List<String> children = state.zkClient.getChildren(zkPaths.aliveNodesDir); for (int i = 0; i < children.size(); i++) { UUID id = ZkIgnitePaths.aliveNodeId(children.get(i)); @@ -235,12 +274,17 @@ public class ZookeeperDiscoveryImpl { throw new IgniteInterruptedException(e); } + finally { + busyLock.leaveBusy(); + } } /** * @param msg Message. */ public void sendCustomMessage(DiscoverySpiCustomMessage msg) { + checkState(); + assert msg != null; byte[] msgBytes; @@ -252,10 +296,16 @@ public class ZookeeperDiscoveryImpl { throw new IgniteSpiException("Failed to marshal custom message: " + msg, e); } + if (!busyLock.enterBusy()) { + checkState(); + + throw new IgniteSpiException("Zookeeper client closed."); + } + try { String prefix = UUID.randomUUID().toString(); - zkClient.createSequential(prefix, + state.zkClient.createSequential(prefix, zkPaths.customEvtsDir, prefix + ":" + locNode.id() + '|', msgBytes, @@ -269,24 +319,43 @@ public class ZookeeperDiscoveryImpl { throw new IgniteInterruptedException(e); } + finally { + busyLock.leaveBusy(); + } } /** * @return Cluster start time. */ public long gridStartTime() { - return gridStartTime; + return state.gridStartTime; } /** - * @param igniteInstanceName Ignite instance name. - * @param connectString Zookeeper connect string. - * @param sesTimeout Zookeeper session timeout. * @throws InterruptedException If interrupted. */ - public void joinTopology(String igniteInstanceName, String connectString, int sesTimeout) - throws InterruptedException - { + public void joinTopology() throws InterruptedException { + joinTopology0(false); + + for (;;) { + try { + joinFut.get(10_000); + + break; + } + catch (IgniteFutureTimeoutCheckedException e) { + U.warn(log, "Waiting for local join event [nodeId=" + locNode.id() + ", name=" + igniteInstanceName + ']'); + } + catch (Exception e) { + throw new IgniteSpiException("Failed to join cluster", e); + } + } + } + + /** + * @throws InterruptedException If interrupted. + */ + private void joinTopology0(boolean reconnect) throws InterruptedException { DiscoveryDataBag discoDataBag = new DiscoveryDataBag(locNode.id()); exchange.collect(discoDataBag); @@ -303,7 +372,7 @@ public class ZookeeperDiscoveryImpl { } try { - zkClient = new ZookeeperClient(igniteInstanceName, + state.zkClient = new ZookeeperClient(igniteInstanceName, log, connectString, sesTimeout, @@ -313,23 +382,15 @@ public class ZookeeperDiscoveryImpl { throw new IgniteSpiException("Failed to create Zookeeper client", e); } - initZkNodes(); - - startJoin(joinDataBytes); - - for (;;) { - try { - joinFut.get(10_000); + if (!reconnect) { + evtWorker = new ZkEventWorker(igniteInstanceName, "zookeeper-disco-evt-worker", log); - break; - } - catch (IgniteFutureTimeoutCheckedException e) { - U.warn(log, "Waiting for local join event [nodeId=" + locNode.id() + ", name=" + igniteInstanceName + ']'); - } - catch (Exception e) { - throw new IgniteSpiException("Failed to join cluster", e); - } + evtWorker.start(); } + else + assert evtWorker != null; + + startJoin(joinDataBytes); } /** @@ -337,7 +398,7 @@ public class ZookeeperDiscoveryImpl { */ private void initZkNodes() throws InterruptedException { try { - if (zkClient.exists(zkPaths.aliveNodesDir)) + if (state.zkClient.exists(zkPaths.aliveNodesDir)) return; // This path is created last, assume all others dirs are created. List<String> dirs = new ArrayList<>(); @@ -351,14 +412,14 @@ public class ZookeeperDiscoveryImpl { dirs.add(zkPaths.aliveNodesDir); try { - zkClient.createAll(dirs, PERSISTENT); + state.zkClient.createAll(dirs, PERSISTENT); } catch (KeeperException.NodeExistsException e) { if (log.isDebugEnabled()) log.debug("Failed to create nodes using bulk operation: " + e); for (String dir : dirs) - zkClient.createIfNeeded(dir, null, PERSISTENT); + state.zkClient.createIfNeeded(dir, null, PERSISTENT); } } catch (ZookeeperClientFailedException e) { @@ -371,12 +432,17 @@ public class ZookeeperDiscoveryImpl { * @throws InterruptedException If interrupted. */ private void startJoin(byte[] joinDataBytes) throws InterruptedException { + if (!busyLock.enterBusy()) + return; + try { + initZkNodes(); + String prefix = UUID.randomUUID().toString(); // TODO ZK: handle max size. - String path = zkClient.createSequential(prefix, + String path = state.zkClient.createSequential(prefix, zkPaths.joinDataDir, prefix + ":" + locNode.id() + "|", joinDataBytes, @@ -384,7 +450,7 @@ public class ZookeeperDiscoveryImpl { int seqNum = Integer.parseInt(path.substring(path.lastIndexOf('|') + 1)); - locNodeZkPath = zkClient.createSequential( + state.locNodeZkPath = state.zkClient.createSequential( prefix, zkPaths.aliveNodesDir, prefix + ":" + locNode.id() + "|" + seqNum + "|", @@ -393,21 +459,20 @@ public class ZookeeperDiscoveryImpl { log.info("Node started join [nodeId=" + locNode.id() + ", instanceName=" + locNode.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME) + - ", nodePath=" + locNodeZkPath + ']'); + ", nodePath=" + state.locNodeZkPath + ']'); - zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new AsyncCallback.Children2Callback() { - @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - onConnected(rc, children); - } - }); + state.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback()); - zkClient.getDataAsync(zkPaths.evtsPath, watcher, dataCallback); + state.zkClient.getDataAsync(zkPaths.evtsPath, watcher, dataCallback); connStartLatch.countDown(); } catch (ZookeeperClientFailedException e) { throw new IgniteSpiException("Failed to initialize Zookeeper nodes", e); } + finally { + busyLock.leaveBusy(); + } } /** TODO ZK */ @@ -423,103 +488,51 @@ public class ZookeeperDiscoveryImpl { } /** - * @param rc Async callback result. - * @param aliveNodes Alive nodes. - */ - private void onConnected(int rc, List<String> aliveNodes) { - assert !joined; - - checkIsCoordinator(rc, aliveNodes); - } - - /** * @param rc Callback result code. * @param aliveNodes Alive nodes. + * @throws Exception If failed. */ - private void checkIsCoordinator(int rc, final List<String> aliveNodes) { - try { - assert rc == 0 : KeeperException.Code.get(rc); + private void checkIsCoordinator(int rc, final List<String> aliveNodes) throws Exception { + assert rc == 0 : KeeperException.Code.get(rc); - TreeMap<Integer, String> alives = new TreeMap<>(); + TreeMap<Integer, String> alives = new TreeMap<>(); - Integer locInternalId = null; + Integer locInternalId = null; - for (String aliveNodePath : aliveNodes) { - Integer internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath); + for (String aliveNodePath : aliveNodes) { + Integer internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath); - alives.put(internalId, aliveNodePath); + alives.put(internalId, aliveNodePath); - if (locInternalId == null) { - UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath); + if (locInternalId == null) { + UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath); - if (locNode.id().equals(nodeId)) - locInternalId = internalId; - } + if (locNode.id().equals(nodeId)) + locInternalId = internalId; } + } - assert !alives.isEmpty(); - assert locInternalId != null; - - Map.Entry<Integer, String> crdE = alives.firstEntry(); - - if (locInternalId.equals(crdE.getKey())) - onBecomeCoordinator(aliveNodes, locInternalId); - else { - assert alives.size() > 1; - - Map.Entry<Integer, String> prevE = alives.floorEntry(locInternalId - 1); - - assert prevE != null; - - log.info("Discovery coordinator already exists, watch for previous node [" + - "locId=" + locNode.id() + - ", prevPath=" + prevE.getValue() + ']'); + assert !alives.isEmpty(); + assert locInternalId != null; - PreviousNodeWatcher watcher = new PreviousNodeWatcher(); + Map.Entry<Integer, String> crdE = alives.firstEntry(); - zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + prevE.getValue(), watcher, watcher); - } - } - catch (Throwable e) { - onFatalError(e); - } - } + if (locInternalId.equals(crdE.getKey())) + onBecomeCoordinator(aliveNodes, locInternalId); + else { + assert alives.size() > 1; - /** - * - */ - private class PreviousNodeWatcher implements Watcher, AsyncCallback.StatCallback { - @Override public void process(WatchedEvent evt) { - if (evt.getType() == Event.EventType.NodeDeleted) { - try { - onPreviousNodeFail(); - } - catch (Throwable e) { - onFatalError(e); - } - } - else { - if (log.isInfoEnabled()) - log.info("Previous node watch event: " + evt); + Map.Entry<Integer, String> prevE = alives.floorEntry(locInternalId - 1); - if (evt.getType() != Event.EventType.None) - zkClient.existsAsync(evt.getPath(), this, this); - } - } + assert prevE != null; - @Override public void processResult(int rc, String path, Object ctx, Stat stat) { - log.info("Previous node stat callback [rc=" + rc + ", path=" + path + ", stat=" + stat + ']'); + log.info("Discovery coordinator already exists, watch for previous node [" + + "locId=" + locNode.id() + + ", prevPath=" + prevE.getValue() + ']'); - assert rc == 0 || rc == KeeperException.Code.NONODE.intValue() : KeeperException.Code.get(rc); + PreviousNodeWatcher watcher = new PreviousNodeWatcher(); - if (rc == KeeperException.Code.NONODE.intValue() || stat == null) { - try { - onPreviousNodeFail(); - } - catch (Throwable e) { - onFatalError(e); - } - } + state.zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + prevE.getValue(), watcher, watcher); } } @@ -534,11 +547,7 @@ public class ZookeeperDiscoveryImpl { if (log.isInfoEnabled()) log.info("Previous node failed, check is node new coordinator [locId=" + locNode.id() + ']'); - zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new AsyncCallback.Children2Callback() { - @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - checkIsCoordinator(rc, children); - } - }); + state.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback()); } /** @@ -547,22 +556,22 @@ public class ZookeeperDiscoveryImpl { * @throws Exception If failed. */ private void onBecomeCoordinator(List<String> aliveNodes, int locInternalId) throws Exception { - byte[] evtsDataBytes = zkClient.getData(zkPaths.evtsPath); + byte[] evtsDataBytes = state.zkClient.getData(zkPaths.evtsPath); if (evtsDataBytes.length > 0) processNewEvents(evtsDataBytes); - crd = true; + state.crd = true; - if (joined) { + if (state.joined) { if (log.isInfoEnabled()) log.info("Node is new discovery coordinator [locId=" + locNode.id() + ']'); assert locNode.order() > 0 : locNode; - assert this.evtsData != null; + assert state.evtsData != null; - for (ZkDiscoveryEventData evtData : evtsData.evts.values()) - evtData.initRemainingAcks(top.nodesByOrder.values()); + for (ZkDiscoveryEventData evtData : state.evtsData.evts.values()) + evtData.initRemainingAcks(state.top.nodesByOrder.values()); handleProcessedEvents(); } @@ -573,8 +582,8 @@ public class ZookeeperDiscoveryImpl { newClusterStarted(locInternalId); } - zkClient.getChildrenAsync(zkPaths.aliveNodesDir, watcher, childrenCallback); - zkClient.getChildrenAsync(zkPaths.customEvtsDir, watcher, childrenCallback); + state.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, watcher, childrenCallback); + state.zkClient.getChildrenAsync(zkPaths.customEvtsDir, watcher, childrenCallback); for (String alivePath : aliveNodes) watchAliveNodeData(alivePath); @@ -584,12 +593,12 @@ public class ZookeeperDiscoveryImpl { * @param alivePath */ private void watchAliveNodeData(String alivePath) { - assert locNodeZkPath != null; + assert state.locNodeZkPath != null; String path = zkPaths.aliveNodesDir + "/" + alivePath; - if (!path.equals(locNodeZkPath)) - zkClient.getDataAsync(path, aliveNodeDataWatcher, aliveNodeDataWatcher); + if (!path.equals(state.locNodeZkPath)) + state.zkClient.getDataAsync(path, aliveNodeDataWatcher, aliveNodeDataWatcher); } /** @@ -597,14 +606,14 @@ public class ZookeeperDiscoveryImpl { * @throws Exception If failed. */ private void generateTopologyEvents(List<String> aliveNodes) throws Exception { - assert crd; + assert state.crd; if (log.isInfoEnabled()) log.info("Process alive nodes change: " + aliveNodes); TreeMap<Integer, String> alives = new TreeMap<>(); - TreeMap<Long, ZookeeperClusterNode> curTop = new TreeMap<>(top.nodesByOrder); + TreeMap<Long, ZookeeperClusterNode> curTop = new TreeMap<>(state.top.nodesByOrder); boolean newEvts = false; @@ -615,7 +624,7 @@ public class ZookeeperDiscoveryImpl { assert old == null; - if (!top.nodesByInternalId.containsKey(internalId)) { + if (!state.top.nodesByInternalId.containsKey(internalId)) { generateNodeJoin(curTop, internalId, child); watchAliveNodeData(child); @@ -624,7 +633,7 @@ public class ZookeeperDiscoveryImpl { } } - for (Map.Entry<Integer, ZookeeperClusterNode> e : top.nodesByInternalId.entrySet()) { + for (Map.Entry<Integer, ZookeeperClusterNode> e : state.top.nodesByInternalId.entrySet()) { if (!alives.containsKey(e.getKey())) { ZookeeperClusterNode failedNode = e.getValue(); @@ -646,14 +655,14 @@ public class ZookeeperDiscoveryImpl { private void saveAndProcessNewEvents() throws Exception { long start = System.currentTimeMillis(); - zkClient.setData(zkPaths.evtsPath, marsh.marshal(evtsData), -1); + state.zkClient.setData(zkPaths.evtsPath, marsh.marshal(state.evtsData), -1); long time = System.currentTimeMillis() - start; if (log.isInfoEnabled()) - log.info("Discovery coordinator saved new topology events [topVer=" + evtsData.topVer + ", saveTime=" + time + ']'); + log.info("Discovery coordinator saved new topology events [topVer=" + state.evtsData.topVer + ", saveTime=" + time + ']'); - processNewEvents(evtsData); + processNewEvents(state.evtsData); } /** @@ -665,15 +674,15 @@ public class ZookeeperDiscoveryImpl { assert rmvd != null; - evtsData.topVer++; - evtsData.evtIdGen++; + state.evtsData.topVer++; + state.evtsData.evtIdGen++; ZkDiscoveryNodeFailEventData evtData = new ZkDiscoveryNodeFailEventData( - evtsData.evtIdGen, - evtsData.topVer, + state.evtsData.evtIdGen, + state.evtsData.topVer, failedNode.internalId()); - evtsData.addEvent(curTop.values(), evtData); + state.evtsData.addEvent(curTop.values(), evtData); if (log.isInfoEnabled()) log.info("Generated NODE_FAILED event [evt=" + evtData + ']'); @@ -691,18 +700,12 @@ public class ZookeeperDiscoveryImpl { throws Exception { UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath); - int joinSeq = ZkIgnitePaths.aliveJoinDataSequence(aliveNodePath); - - String joinDataPath = zkPaths.joinDataDir + '/' + - ZkIgnitePaths.aliveNodePrefixId(aliveNodePath) + ":" + - nodeId.toString() + - "|" + - String.format("%010d", joinSeq); + String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, aliveNodePath); byte[] joinData; try { - joinData = zkClient.getData(joinDataPath); + joinData = state.zkClient.getData(joinDataPath); } catch (KeeperException.NoNodeException e) { U.warn(log, "Failed to read joining node data, node left before join process finished: " + nodeId); @@ -717,10 +720,10 @@ public class ZookeeperDiscoveryImpl { assert nodeId.equals(joinedNode.id()) : joiningNodeData.node(); - evtsData.topVer++; - evtsData.evtIdGen++; + state.evtsData.topVer++; + state.evtsData.evtIdGen++; - joinedNode.order(evtsData.topVer); + joinedNode.order(state.evtsData.topVer); joinedNode.internalId(internalId); DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(nodeId); @@ -744,21 +747,21 @@ public class ZookeeperDiscoveryImpl { assert old == null; ZkDiscoveryNodeJoinEventData evtData = new ZkDiscoveryNodeJoinEventData( - evtsData.evtIdGen, - evtsData.topVer, + state.evtsData.evtIdGen, + state.evtsData.topVer, joinedNode.id(), joinedNode.internalId()); evtData.joiningNodeData = joiningNodeData; - evtsData.addEvent(dataForJoined.topology(), evtData); + state.evtsData.addEvent(dataForJoined.topology(), evtData); evtData.addRemainingAck(joinedNode); // Topology for joined node does not contain joined node. long start = System.currentTimeMillis(); - zkClient.createIfNeeded(zkPaths.joinEventDataPath(evtData.eventId()), joinData, PERSISTENT); - zkClient.createIfNeeded(zkPaths.joinEventDataPathForJoined(evtData.eventId()), marshal(dataForJoined), PERSISTENT); + state.zkClient.createIfNeeded(zkPaths.joinEventDataPath(evtData.eventId()), joinData, PERSISTENT); + state.zkClient.createIfNeeded(zkPaths.joinEventDataPathForJoined(evtData.eventId()), marshal(dataForJoined), PERSISTENT); long time = System.currentTimeMillis() - start; @@ -774,27 +777,38 @@ public class ZookeeperDiscoveryImpl { private void newClusterStarted(int locInternalId) throws Exception { cleanupPreviousClusterData(); - joined = true; + state.joined = true; - gridStartTime = U.currentTimeMillis(); + state.gridStartTime = U.currentTimeMillis(); - evtsData = new ZkDiscoveryEventsData(gridStartTime, 1L, new TreeMap<Long, ZkDiscoveryEventData>()); + state.evtsData = new ZkDiscoveryEventsData(state.gridStartTime, 1L, new TreeMap<Long, ZkDiscoveryEventData>()); locNode.internalId(locInternalId); locNode.order(1); - top.addNode(locNode); + state.top.addNode(locNode); - lsnr.onDiscovery(EventType.EVT_NODE_JOINED, - 1L, - locNode, - (Collection)top.nodesByOrder.values(), - Collections.<Long, Collection<ClusterNode>>emptyMap(), - null); + String path = state.locNodeZkPath.substring(state.locNodeZkPath.lastIndexOf('/') + 1); - joinFut.onDone(); + String joinDataPath = zkPaths.joiningNodeDataPath(locNode.id(), path); - // TODO ZK: remove join zk nodes + // TODO ZK async + state.zkClient.deleteIfExists(joinDataPath, -1); + + final List<ClusterNode> topSnapshot = Collections.singletonList((ClusterNode)locNode); + + evtWorker.evtsQ.add(new Runnable() { + @Override public void run() { + lsnr.onDiscovery(EventType.EVT_NODE_JOINED, + 1L, + locNode, + topSnapshot, + Collections.<Long, Collection<ClusterNode>>emptyMap(), + null); + + joinFut.onDone(); + } + }); } /** @@ -802,9 +816,9 @@ public class ZookeeperDiscoveryImpl { */ private void cleanupPreviousClusterData() throws Exception { // TODO ZK: use multi, better batching. - zkClient.setData(zkPaths.evtsPath, null, -1); + state.zkClient.setData(zkPaths.evtsPath, null, -1); - List<String> evtChildren = zkClient.getChildren(zkPaths.evtsPath); + List<String> evtChildren = state.zkClient.getChildren(zkPaths.evtsPath); for (String evtPath : evtChildren) { String evtDir = zkPaths.evtsPath + "/" + evtPath; @@ -812,14 +826,14 @@ public class ZookeeperDiscoveryImpl { removeChildren(evtDir); } - zkClient.deleteAll(zkPaths.evtsPath, evtChildren, -1); + state.zkClient.deleteAll(zkPaths.evtsPath, evtChildren, -1); - zkClient.deleteAll(zkPaths.customEvtsDir, - zkClient.getChildren(zkPaths.customEvtsDir), + state.zkClient.deleteAll(zkPaths.customEvtsDir, + state.zkClient.getChildren(zkPaths.customEvtsDir), -1); - zkClient.deleteAll(zkPaths.customEvtsAcksDir, - zkClient.getChildren(zkPaths.customEvtsAcksDir), + state.zkClient.deleteAll(zkPaths.customEvtsAcksDir, + state.zkClient.getChildren(zkPaths.customEvtsAcksDir), -1); } @@ -828,15 +842,15 @@ public class ZookeeperDiscoveryImpl { * @throws Exception If failed. */ private void removeChildren(String path) throws Exception { - zkClient.deleteAll(path, zkClient.getChildren(path), -1); + state.zkClient.deleteAll(path, state.zkClient.getChildren(path), -1); } ZkClusterNodes nodes() { - return top; + return state.top; } ZookeeperClient zkClient() { - return zkClient; + return state.zkClient; } /** @@ -844,7 +858,7 @@ public class ZookeeperDiscoveryImpl { * @throws Exception If failed. */ private void generateCustomEvents(List<String> customEvtNodes) throws Exception { - assert crd; + assert state.crd; TreeMap<Integer, String> newEvts = null; @@ -853,7 +867,7 @@ public class ZookeeperDiscoveryImpl { int evtSeq = ZkIgnitePaths.customEventSequence(evtPath); - if (evtSeq > evtsData.procCustEvt) { + if (evtSeq > state.evtsData.procCustEvt) { if (newEvts == null) newEvts = new TreeMap<>(); @@ -865,30 +879,30 @@ public class ZookeeperDiscoveryImpl { for (Map.Entry<Integer, String> evtE : newEvts.entrySet()) { UUID sndNodeId = ZkIgnitePaths.customEventSendNodeId(evtE.getValue()); - ZookeeperClusterNode sndNode = top.nodesById.get(sndNodeId); + ZookeeperClusterNode sndNode = state.top.nodesById.get(sndNodeId); String evtDataPath = zkPaths.customEvtsDir + "/" + evtE.getValue(); if (sndNode != null) { - byte[] evtBytes = zkClient.getData(zkPaths.customEvtsDir + "/" + evtE.getValue()); + byte[] evtBytes = state.zkClient.getData(zkPaths.customEvtsDir + "/" + evtE.getValue()); DiscoverySpiCustomMessage msg; try { msg = unmarshal(evtBytes); - evtsData.evtIdGen++; + state.evtsData.evtIdGen++; ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData( - evtsData.evtIdGen, - evtsData.topVer, + state.evtsData.evtIdGen, + state.evtsData.topVer, sndNodeId, evtE.getValue(), false); evtData.msg = msg; - evtsData.addEvent(top.nodesByOrder.values(), evtData); + state.evtsData.addEvent(state.top.nodesByOrder.values(), evtData); if (log.isInfoEnabled()) log.info("Generated CUSTOM event [evt=" + evtData + ", msg=" + msg + ']'); @@ -900,10 +914,10 @@ public class ZookeeperDiscoveryImpl { else { U.warn(log, "Ignore custom event from unknown node: " + sndNodeId); - zkClient.deleteIfExists(evtDataPath, -1); + state.zkClient.deleteIfExists(evtDataPath, -1); } - evtsData.procCustEvt = evtE.getKey(); + state.evtsData.procCustEvt = evtE.getKey(); } saveAndProcessNewEvents(); @@ -918,17 +932,17 @@ public class ZookeeperDiscoveryImpl { if (data.length == 0) return; - assert !crd; + assert !state.crd; ZkDiscoveryEventsData newEvtsData = unmarshal(data); // Need keep processed custom events since they contains message object. - if (evtsData != null) - newEvtsData.evts.putAll(evtsData.evts); + if (state.evtsData != null) + newEvtsData.evts.putAll(state.evtsData.evts); processNewEvents(newEvtsData); - this.evtsData = newEvtsData; + state.evtsData = newEvtsData; } /** @@ -941,8 +955,8 @@ public class ZookeeperDiscoveryImpl { boolean updateNodeInfo = false; - for (ZkDiscoveryEventData evtData : evts.tailMap(locNodeInfo.lastProcEvt, false).values()) { - if (!joined) { + for (ZkDiscoveryEventData evtData : evts.tailMap(state.locNodeInfo.lastProcEvt, false).values()) { + if (!state.joined) { if (evtData.eventType() != EventType.EVT_NODE_JOINED) continue; @@ -966,7 +980,7 @@ public class ZookeeperDiscoveryImpl { ZkJoiningNodeData joiningData; - if (crd) { + if (state.crd) { assert evtData0.joiningNodeData != null; joiningData = evtData0.joiningNodeData; @@ -974,7 +988,7 @@ public class ZookeeperDiscoveryImpl { else { String path = zkPaths.joinEventDataPath(evtData.eventId()); - joiningData = unmarshal(zkClient.getData(path)); + joiningData = unmarshal(state.zkClient.getData(path)); DiscoveryDataBag dataBag = new DiscoveryDataBag(evtData0.nodeId); @@ -1002,7 +1016,7 @@ public class ZookeeperDiscoveryImpl { DiscoverySpiCustomMessage msg; - if (crd) { + if (state.crd) { assert evtData0.msg != null : evtData0; msg = evtData0.msg; @@ -1011,7 +1025,7 @@ public class ZookeeperDiscoveryImpl { String path = zkPaths.customEventDataPath(evtData0.ackEvent(), evtData0.evtPath); - msg = unmarshal(zkClient.getData(path)); + msg = unmarshal(state.zkClient.getData(path)); evtData0.msg = msg; } @@ -1029,23 +1043,23 @@ public class ZookeeperDiscoveryImpl { } } - if (joined) { - locNodeInfo.lastProcEvt = evtData.eventId(); + if (state.joined) { + state.locNodeInfo.lastProcEvt = evtData.eventId(); - procEvtCnt++; + state.procEvtCnt++; - if (procEvtCnt % evtsAckThreshold == 0) + if (state.procEvtCnt % evtsAckThreshold == 0) updateNodeInfo = true; } } - if (crd) { + if (state.crd) { handleProcessedEvents(); } else if (updateNodeInfo) { - assert locNodeZkPath != null; + assert state.locNodeZkPath != null; - zkClient.setData(locNodeZkPath, marshal(locNodeInfo), -1); + state.zkClient.setData(state.locNodeZkPath, marshal(state.locNodeInfo), -1); } } @@ -1055,7 +1069,7 @@ public class ZookeeperDiscoveryImpl { * @throws Exception If failed. */ @SuppressWarnings("unchecked") - private void processLocalJoin(ZkDiscoveryEventsData evtsData, ZkDiscoveryNodeJoinEventData evtData) + private void processLocalJoin(ZkDiscoveryEventsData evtsData, final ZkDiscoveryNodeJoinEventData evtData) throws Exception { if (log.isInfoEnabled()) @@ -1063,9 +1077,9 @@ public class ZookeeperDiscoveryImpl { String path = zkPaths.joinEventDataPathForJoined(evtData.eventId()); - ZkJoinEventDataForJoined dataForJoined = unmarshal(zkClient.getData(path)); + ZkJoinEventDataForJoined dataForJoined = unmarshal(state.zkClient.getData(path)); - gridStartTime = evtsData.gridStartTime; + state.gridStartTime = evtsData.gridStartTime; locNode.internalId(evtData.joinedInternalId); locNode.order(evtData.topologyVersion()); @@ -1083,26 +1097,44 @@ public class ZookeeperDiscoveryImpl { node.setMetrics(new ClusterMetricsSnapshot()); - top.addNode(node); + state.top.addNode(node); } - top.addNode(locNode); + state.top.addNode(locNode); - List<ClusterNode> topSnapshot = new ArrayList<>((Collection)top.nodesByOrder.values()); + final List<ClusterNode> topSnapshot = state.top.topologySnapshot(); - lsnr.onDiscovery(evtData.eventType(), - evtData.topologyVersion(), - locNode, - topSnapshot, - Collections.<Long, Collection<ClusterNode>>emptyMap(), - null); + evtWorker.evtsQ.add(new Runnable() { + @Override public void run() { + if (connState == ConnectionState.DISCONNECTED) + connState = ConnectionState.STARTED; - joinFut.onDone(); + lsnr.onDiscovery(evtData.eventType(), + evtData.topologyVersion(), + locNode, + topSnapshot, + Collections.<Long, Collection<ClusterNode>>emptyMap(), + null); + + if (state.prevJoined) { + lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED, + evtData.topologyVersion(), + locNode, + topSnapshot, + Collections.<Long, Collection<ClusterNode>>emptyMap(), + null); + + U.quietAndWarn(log, "Client node was reconnected after it was already considered failed."); + } - joined = true; + joinFut.onDone(); + } + }); + + state.joined = true; // TODO ZK: async - zkClient.deleteIfExists(path, -1); + state.zkClient.deleteIfExists(path, -1); } /** @@ -1110,22 +1142,26 @@ public class ZookeeperDiscoveryImpl { * @param msg Custom message. */ @SuppressWarnings("unchecked") - private void notifyCustomEvent(ZkDiscoveryCustomEventData evtData, DiscoverySpiCustomMessage msg) { + private void notifyCustomEvent(final ZkDiscoveryCustomEventData evtData, final DiscoverySpiCustomMessage msg) { if (log.isInfoEnabled()) log.info(" [topVer=" + evtData.topologyVersion() + ", msg=" + msg + ']'); - ZookeeperClusterNode sndNode = top.nodesById.get(evtData.sndNodeId); + final ZookeeperClusterNode sndNode = state.top.nodesById.get(evtData.sndNodeId); assert sndNode != null : evtData; - List<ClusterNode> topSnapshot = new ArrayList<>((Collection)top.nodesByOrder.values()); + final List<ClusterNode> topSnapshot = state.top.topologySnapshot(); - lsnr.onDiscovery(evtData.eventType(), - evtData.topologyVersion(), - sndNode, - topSnapshot, - Collections.<Long, Collection<ClusterNode>>emptyMap(), - msg); + evtWorker.evtsQ.add(new Runnable() { + @Override public void run() { + lsnr.onDiscovery(evtData.eventType(), + evtData.topologyVersion(), + sndNode, + topSnapshot, + Collections.<Long, Collection<ClusterNode>>emptyMap(), + msg); + } + }); } /** @@ -1133,50 +1169,58 @@ public class ZookeeperDiscoveryImpl { * @param joiningData Joining node data. */ @SuppressWarnings("unchecked") - private void notifyNodeJoin(ZkDiscoveryNodeJoinEventData evtData, ZkJoiningNodeData joiningData) { - ZookeeperClusterNode joinedNode = joiningData.node(); + private void notifyNodeJoin(final ZkDiscoveryNodeJoinEventData evtData, ZkJoiningNodeData joiningData) { + final ZookeeperClusterNode joinedNode = joiningData.node(); joinedNode.order(evtData.topologyVersion()); joinedNode.internalId(evtData.joinedInternalId); joinedNode.setMetrics(new ClusterMetricsSnapshot()); - top.addNode(joinedNode); + state.top.addNode(joinedNode); - List<ClusterNode> topSnapshot = new ArrayList<>((Collection)top.nodesByOrder.values()); + final List<ClusterNode> topSnapshot = state.top.topologySnapshot(); - lsnr.onDiscovery(evtData.eventType(), - evtData.topologyVersion(), - joinedNode, - topSnapshot, - Collections.<Long, Collection<ClusterNode>>emptyMap(), - null); + evtWorker.evtsQ.add(new Runnable() { + @Override public void run() { + lsnr.onDiscovery(evtData.eventType(), + evtData.topologyVersion(), + joinedNode, + topSnapshot, + Collections.<Long, Collection<ClusterNode>>emptyMap(), + null); + } + }); } /** * @param evtData Event data. */ @SuppressWarnings("unchecked") - private void notifyNodeFail(ZkDiscoveryNodeFailEventData evtData) { - ZookeeperClusterNode failedNode = top.removeNode(evtData.failedNodeInternalId()); + private void notifyNodeFail(final ZkDiscoveryNodeFailEventData evtData) { + final ZookeeperClusterNode failedNode = state.top.removeNode(evtData.failedNodeInternalId()); assert failedNode != null; - List<ClusterNode> topSnapshot = new ArrayList<>((Collection)top.nodesByOrder.values()); + final List<ClusterNode> topSnapshot = state.top.topologySnapshot(); - lsnr.onDiscovery(evtData.eventType(), - evtData.topologyVersion(), - failedNode, - topSnapshot, - Collections.<Long, Collection<ClusterNode>>emptyMap(), - null); + evtWorker.evtsQ.add(new Runnable() { + @Override public void run() { + lsnr.onDiscovery(evtData.eventType(), + evtData.topologyVersion(), + failedNode, + topSnapshot, + Collections.<Long, Collection<ClusterNode>>emptyMap(), + null); + } + }); } /** * @throws Exception If failed. */ private void handleProcessedEvents() throws Exception { - Iterator<ZkDiscoveryEventData> it = this.evtsData.evts.values().iterator(); + Iterator<ZkDiscoveryEventData> it = state.evtsData.evts.values().iterator(); List<ZkDiscoveryCustomEventData> newEvts = null; @@ -1195,15 +1239,15 @@ public class ZookeeperDiscoveryImpl { DiscoverySpiCustomMessage ack = handleProcessedCustomEvent((ZkDiscoveryCustomEventData)evtData); if (ack != null) { - evtsData.evtIdGen++; + state.evtsData.evtIdGen++; - long evtId = evtsData.evtIdGen; + long evtId = state.evtsData.evtIdGen; byte[] ackBytes = marshal(ack); String evtChildPath = String.valueOf(evtId); - zkClient.createIfNeeded( + state.zkClient.createIfNeeded( zkPaths.customEventDataPath(true, evtChildPath), ackBytes, CreateMode.PERSISTENT); @@ -1243,7 +1287,7 @@ public class ZookeeperDiscoveryImpl { if (newEvts != null) { for (int i = 0; i < newEvts.size(); i++) - evtsData.addEvent(top.nodesByOrder.values(), newEvts.get(i)); + state.evtsData.addEvent(state.top.nodesByOrder.values(), newEvts.get(i)); saveAndProcessNewEvents(); } @@ -1256,7 +1300,7 @@ public class ZookeeperDiscoveryImpl { private void handleProcessedEventsOnNodeFail(ZookeeperClusterNode failedNode) throws Exception { boolean processed = false; - for (Iterator<Map.Entry<Long, ZkDiscoveryEventData>> it = evtsData.evts.entrySet().iterator(); it.hasNext();) { + for (Iterator<Map.Entry<Long, ZkDiscoveryEventData>> it = state.evtsData.evts.entrySet().iterator(); it.hasNext();) { Map.Entry<Long, ZkDiscoveryEventData> e = it.next(); ZkDiscoveryEventData evtData = e.getValue(); @@ -1274,10 +1318,11 @@ public class ZookeeperDiscoveryImpl { * @throws Exception If failed. */ private void handleProcessedJoinEvent(ZkDiscoveryNodeJoinEventData evtData) throws Exception { - log.info("All nodes processed node join [evtData=" + evtData + ']'); + if (log.isInfoEnabled()) + log.info("All nodes processed node join [evtData=" + evtData + ']'); - zkClient.deleteIfExists(zkPaths.joinEventDataPath(evtData.eventId()), -1); - zkClient.deleteIfExists(zkPaths.joinEventDataPathForJoined(evtData.eventId()), -1); + state.zkClient.deleteIfExists(zkPaths.joinEventDataPath(evtData.eventId()), -1); + state.zkClient.deleteIfExists(zkPaths.joinEventDataPathForJoined(evtData.eventId()), -1); } /** @@ -1288,10 +1333,11 @@ public class ZookeeperDiscoveryImpl { @Nullable private DiscoverySpiCustomMessage handleProcessedCustomEvent(ZkDiscoveryCustomEventData evtData) throws Exception { - log.info("All nodes processed custom event [evtData=" + evtData + ']'); + if (log.isInfoEnabled()) + log.info("All nodes processed custom event [evtData=" + evtData + ']'); if (!evtData.ackEvent()) { - zkClient.deleteIfExists(zkPaths.customEventDataPath(false, evtData.evtPath), -1); + state.zkClient.deleteIfExists(zkPaths.customEventDataPath(false, evtData.evtPath), -1); assert evtData.msg != null || locNode.order() > evtData.topologyVersion() : evtData; @@ -1299,32 +1345,72 @@ public class ZookeeperDiscoveryImpl { return evtData.msg.ackMessage(); } else - zkClient.deleteIfExists(zkPaths.customEventDataPath(true, evtData.evtPath), -1); + state.zkClient.deleteIfExists(zkPaths.customEventDataPath(true, evtData.evtPath), -1); return null; } /** - * + * @throws InterruptedException If interrupted. + */ + public void stop() throws InterruptedException { + stop0(new IgniteSpiException("Node stopped")); + } + + /** + * @param e Error. + * @throws InterruptedException If interrupted. */ - public void stop() { + private void stop0(Throwable e) throws InterruptedException { + log.info("Stop ZookeeperDiscovery [nodeId=" + locNode.id() + ", err=" + e + ']'); + + connState = ConnectionState.DISCONNECTED; + + ZookeeperClient zkClient = state.zkClient; + + if (zkClient != null) + zkClient.onCloseStart(); + + busyLock.block(); + + joinFut.onDone(e); + if (zkClient != null) zkClient.close(); - joinFut.onDone(new IgniteSpiException("Node stopped")); + ZkEventWorker evtWorker = this.evtWorker; + + if (evtWorker != null) { + evtWorker.interrupt(); + + evtWorker.join(); + } } /** - * @param e Error. + * @param busyLock Busy lock. + * @param err Error. */ - private void onFatalError(Throwable e) { + private void onFatalError(GridSpinBusyLock busyLock, Throwable err) { + busyLock.leaveBusy(); + + if (connState == ConnectionState.STOPPED) + return; + // TODO ZK - U.error(log, "Failed to process discovery data. Stopping the node in order to prevent cluster wide instability.", e); + U.error(log, "Fatal error in ZookeeperDiscovery.", err); - joinFut.onDone(e); + try { + stop0(err); + } + catch (InterruptedException e) { + U.warn(log, "Failed to finish stop procedure, thread was interrupted."); + + Thread.currentThread().interrupt(); + } - if (e instanceof Error) - throw (Error)e; + if (err instanceof Error) + throw (Error)err; } /** @@ -1352,27 +1438,130 @@ public class ZookeeperDiscoveryImpl { /** * */ - private class ConnectionLossListener implements IgniteRunnable { + private class ZkEventWorker extends IgniteSpiThread { /** */ - private static final long serialVersionUID = 0L; + private final Runnable CONNECTION_LOST = new Runnable() {@Override public void run() {}}; + + /** */ + private final BlockingQueue<Runnable> evtsQ; + + /** + * @param igniteInstanceName Ignite instance name. + * @param name Thread name. + * @param log Logger. + */ + ZkEventWorker(String igniteInstanceName, String name, IgniteLogger log) { + super(igniteInstanceName, name, log); + + evtsQ = new LinkedBlockingQueue<>(); + } /** {@inheritDoc} */ - @Override public void run() { - // TODO ZK, can be called from any thread. - U.warn(log, "Zookeeper connection loss, local node is SEGMENTED"); + @Override protected void body() throws InterruptedException { + while (!isInterrupted()) { + Runnable r = evtsQ.take(); + + if (r == CONNECTION_LOST) + processConnectionLost(); + else { + if (!busyLock.enterBusy()) + return; + + try { + r.run(); + } + finally { + busyLock.leaveBusy(); + } + } + } + } + + /** + * + */ + void onConnectionLoss() { + evtsQ.add(CONNECTION_LOST); + } + + /** + * + */ + void processConnectionLost() { + if (clientReconnectEnabled) { + connState = ConnectionState.DISCONNECTED; - if (joined) { - assert evtsData != null; + busyLock.block(); + + busyLock.unblock(); + + UUID newId = UUID.randomUUID(); + + U.quietAndWarn(log, "Connection to Zookeeper server is lost, local node will try to reconnect with new id [" + + "newId=" + newId + + ", prevId=" + locNode.id() + + ", locNode=" + locNode + ']'); + + locNode.onClientDisconnected(newId); + + if (state.joined) { + assert state.evtsData != null; + + lsnr.onDiscovery(EVT_CLIENT_NODE_DISCONNECTED, + state.evtsData.topVer, + locNode, + state.top.topologySnapshot(), + Collections.<Long, Collection<ClusterNode>>emptyMap(), + null); + } + + state = new ZkRuntimeState(state.joined); + + try { + joinTopology0(true); + } + catch (Exception e) { + U.error(log, "Failed to reconnect: " + e, e); + + onSegemented(e); + } + } + else { + U.warn(log, "Connection to Zookeeper server is lost, local node SEGMENTED."); + + onSegemented(new IgniteSpiException("Zookeeper connection loss.")); + } + } + + /** + * @param e Error. + */ + private void onSegemented(Exception e) { + if (state.joined) { + assert state.evtsData != null; lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED, - evtsData.topVer, + state.evtsData.topVer, locNode, - Collections.<ClusterNode>emptyList(), + state.top.topologySnapshot(), Collections.<Long, Collection<ClusterNode>>emptyMap(), null); } else - joinFut.onDone(new IgniteSpiException("Local node SEGMENTED")); + joinFut.onDone(e); + } + } + + /** + * + */ + private class ConnectionLossListener implements IgniteRunnable { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void run() { + evtWorker.onConnectionLoss(); } } @@ -1382,21 +1571,31 @@ public class ZookeeperDiscoveryImpl { private class ZkWatcher implements Watcher { /** {@inheritDoc} */ @Override public void process(WatchedEvent evt) { - if (evt.getType() == Event.EventType.NodeDataChanged) { - if (evt.getPath().equals(zkPaths.evtsPath)) { - if (!crd) - zkClient.getDataAsync(evt.getPath(), this, dataCallback); + if (!busyLock.enterBusy()) + return; + + try { + if (evt.getType() == Event.EventType.NodeDataChanged) { + if (evt.getPath().equals(zkPaths.evtsPath)) { + if (!state.crd) + state.zkClient.getDataAsync(evt.getPath(), this, dataCallback); + } + else + U.warn(log, "Received NodeDataChanged for unexpected path: " + evt.getPath()); } - else - U.warn(log, "Received NodeDataChanged for unexpected path: " + evt.getPath()); + else if (evt.getType() == Event.EventType.NodeChildrenChanged) { + if (evt.getPath().equals(zkPaths.aliveNodesDir)) + state.zkClient.getChildrenAsync(evt.getPath(), this, childrenCallback); + else if (evt.getPath().equals(zkPaths.customEvtsDir)) + state.zkClient.getChildrenAsync(evt.getPath(), this, childrenCallback); + else + U.warn(log, "Received NodeChildrenChanged for unexpected path: " + evt.getPath()); + } + + busyLock.leaveBusy(); } - else if (evt.getType() == Event.EventType.NodeChildrenChanged) { - if (evt.getPath().equals(zkPaths.aliveNodesDir)) - zkClient.getChildrenAsync(evt.getPath(), this, childrenCallback); - else if (evt.getPath().equals(zkPaths.customEvtsDir)) - zkClient.getChildrenAsync(evt.getPath(), this, childrenCallback); - else - U.warn(log, "Received NodeChildrenChanged for unexpected path: " + evt.getPath()); + catch (Throwable e) { + onFatalError(busyLock, e); } } } @@ -1407,6 +1606,9 @@ public class ZookeeperDiscoveryImpl { private class ZKChildrenCallback implements AsyncCallback.Children2Callback { /** {@inheritDoc} */ @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { + if (!busyLock.enterBusy()) + return; + try { assert rc == 0 : KeeperException.Code.get(rc); @@ -1416,9 +1618,11 @@ public class ZookeeperDiscoveryImpl { generateCustomEvents(children); else U.warn(log, "Children callback for unexpected path: " + path); + + busyLock.leaveBusy(); } catch (Throwable e) { - onFatalError(e); + onFatalError(busyLock, e); } } } @@ -1429,18 +1633,23 @@ public class ZookeeperDiscoveryImpl { private class ZkDataCallback implements AsyncCallback.DataCallback { /** {@inheritDoc} */ @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + if (!busyLock.enterBusy()) + return; + try { assert rc == 0 : KeeperException.Code.get(rc); if (path.equals(zkPaths.evtsPath)) { - if (!crd) + if (!state.crd) processNewEvents(data); } else U.warn(log, "Data callback for unknown path: " + path); + + busyLock.leaveBusy(); } catch (Throwable e) { - onFatalError(e); + onFatalError(busyLock, e); } } } @@ -1449,14 +1658,46 @@ public class ZookeeperDiscoveryImpl { * */ private class AliveNodeDataWatcher implements Watcher, AsyncCallback.DataCallback { + /** {@inheritDoc} */ @Override public void process(WatchedEvent evt) { - if (evt.getType() == Event.EventType.NodeDataChanged) - zkClient.getDataAsync(evt.getPath(), this, this); + if (!busyLock.enterBusy()) + return; + + try { + if (evt.getType() == Event.EventType.NodeDataChanged) + state.zkClient.getDataAsync(evt.getPath(), this, this); + + busyLock.leaveBusy(); + } + catch (Throwable e) { + onFatalError(busyLock, e); + } } + /** {@inheritDoc} */ @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - assert crd; + assert state.crd; + + if (!busyLock.enterBusy()) + return; + + try { + processResult0(rc, path, data); + + busyLock.leaveBusy(); + } + catch (Throwable e) { + onFatalError(busyLock, e); + } + } + /** + * @param rc Result code. + * @param path Path. + * @param data Data. + * @throws Exception If failed. + */ + private void processResult0(int rc, String path, byte[] data) throws Exception { if (rc == KeeperException.Code.NONODE.intValue()) { if (log.isDebugEnabled()) log.debug("Alive node callaback, no node: " + path); @@ -1466,30 +1707,108 @@ public class ZookeeperDiscoveryImpl { assert rc == 0 : KeeperException.Code.get(rc); - try { - if (data.length > 0) { - ZkAliveNodeData nodeData = unmarshal(data); + if (data.length > 0) { + ZkAliveNodeData nodeData = unmarshal(data); - Integer nodeInternalId = ZkIgnitePaths.aliveInternalId(path); + Integer nodeInternalId = ZkIgnitePaths.aliveInternalId(path); - Iterator<ZkDiscoveryEventData> it = evtsData.evts.values().iterator(); + Iterator<ZkDiscoveryEventData> it = state.evtsData.evts.values().iterator(); - boolean processed = false; + boolean processed = false; - while (it.hasNext()) { - ZkDiscoveryEventData evtData = it.next(); + while (it.hasNext()) { + ZkDiscoveryEventData evtData = it.next(); - if (evtData.onAckReceived(nodeInternalId, nodeData.lastProcEvt)) - processed = true; - } + if (evtData.onAckReceived(nodeInternalId, nodeData.lastProcEvt)) + processed = true; + } - if (processed) - handleProcessedEvents(); + if (processed) + handleProcessedEvents(); + } + } + } + + /** + * + */ + private class PreviousNodeWatcher implements Watcher, AsyncCallback.StatCallback { + /** {@inheritDoc} */ + @Override public void process(WatchedEvent evt) { + if (!busyLock.enterBusy()) + return; + + try { + if (evt.getType() == Event.EventType.NodeDeleted) { + onPreviousNodeFail(); } + else { + if (log.isInfoEnabled()) + log.info("Previous node watch event: " + evt); + + if (evt.getType() != Event.EventType.None) + state.zkClient.existsAsync(evt.getPath(), this, this); + } + + busyLock.leaveBusy(); } catch (Throwable e) { - onFatalError(e); + onFatalError(busyLock, e); } } + + /** {@inheritDoc} */ + @Override public void processResult(int rc, String path, Object ctx, Stat stat) { + if (!busyLock.enterBusy()) + return; + + try { + log.info("Previous node stat callback [rc=" + rc + ", path=" + path + ", stat=" + stat + ']'); + + assert rc == 0 || rc == KeeperException.Code.NONODE.intValue() : KeeperException.Code.get(rc); + + if (rc == KeeperException.Code.NONODE.intValue() || stat == null) + onPreviousNodeFail(); + + busyLock.leaveBusy(); + } + catch (Throwable e) { + onFatalError(busyLock, e); + } + } + } + + /** + * + */ + class CheckCoordinatorCallback implements AsyncCallback.Children2Callback { + /** {@inheritDoc} */ + @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { + if (!busyLock.enterBusy()) + return; + + try { + assert rc == 0 : rc; + + checkIsCoordinator(rc, children); + + busyLock.leaveBusy(); + } + catch (Throwable e) { + onFatalError(busyLock, e); + } + } + } + + /** + * + */ + enum ConnectionState { + /** */ + STARTED, + /** */ + DISCONNECTED, + /** */ + STOPPED } }
