Repository: ignite Updated Branches: refs/heads/ignite-zk be7ae489b -> e909027fa
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e909027f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e909027f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e909027f Branch: refs/heads/ignite-zk Commit: e909027fa4268c1e72f4d756b7b2da24bbf89ae3 Parents: be7ae48 Author: sboikov <[email protected]> Authored: Tue Dec 5 10:47:13 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Dec 5 14:00:55 2017 +0300 ---------------------------------------------------------------------- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 8 +- .../ZkInternalForceNodeFailMessage.java | 2 +- .../zk/internal/ZkInternalJoinErrorMessage.java | 8 +- .../zk/internal/ZkInternalMessage.java | 4 +- .../discovery/zk/internal/ZkRuntimeState.java | 10 +- .../discovery/zk/internal/ZookeeperClient.java | 5 +- .../zk/internal/ZookeeperDiscoveryImpl.java | 570 ++++++++++++------- .../ZookeeperDiscoverySpiBasicTest.java | 37 +- 8 files changed, 425 insertions(+), 219 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e909027f/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index f62706e..bd7f427 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -193,6 +193,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery consistentId = ignite.configuration().getConsistentId(); if (consistentId == null) { + initAddresses(); + final List<String> sortedAddrs = new ArrayList<>(addrs.get1()); Collections.sort(sortedAddrs); @@ -344,15 +346,13 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery ", rootPath=" + zkRootPath + ']'); impl = new ZookeeperDiscoveryImpl( + this, igniteInstanceName, - zkConnectionString, - sesTimeout, log, zkRootPath, locNode, lsnr, - exchange, - locNode.isClient() && !clientReconnectDisabled); + exchange); try { impl.joinTopology(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e909027f/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java index fafcafc..f2fb183 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java @@ -24,7 +24,7 @@ import org.jetbrains.annotations.Nullable; /** * */ -public class ZkInternalForceNodeFailMessage implements ZkInternalMessage { +public class ZkInternalForceNodeFailMessage implements DiscoverySpiCustomMessage, ZkInternalMessage { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/e909027f/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java index 7e06858..e724673 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java @@ -17,20 +17,18 @@ package org.apache.ignite.spi.discovery.zk.internal; -import java.io.Serializable; - /** * */ -class ZkInternalJoinErrorMessage implements Serializable { +class ZkInternalJoinErrorMessage implements ZkInternalMessage { /** */ private static final long serialVersionUID = 0L; /** */ - private final int nodeInternalId; + final int nodeInternalId; /** */ - private final String err; + final String err; /** * @param nodeInternalId Joining node internal ID. http://git-wip-us.apache.org/repos/asf/ignite/blob/e909027f/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java index e56bab0..c1d56f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java @@ -17,11 +17,11 @@ package org.apache.ignite.spi.discovery.zk.internal; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import java.io.Serializable; /** * */ -interface ZkInternalMessage extends DiscoverySpiCustomMessage { +interface ZkInternalMessage extends Serializable { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/e909027f/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java index d2d0372..660dc42 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java @@ -17,6 +17,8 @@ package org.apache.ignite.spi.discovery.zk.internal; +import org.apache.ignite.spi.IgniteSpiTimeoutObject; + /** * */ @@ -28,10 +30,16 @@ class ZkRuntimeState { ZookeeperClient zkClient; /** */ + int internalOrder; + + /** */ + IgniteSpiTimeoutObject joinTimeoutObj; + + /** */ long gridStartTime; /** */ - boolean joined; + volatile boolean joined; /** */ ZkDiscoveryEventsData evtsData; http://git-wip-us.apache.org/repos/asf/ignite/blob/e909027f/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java index 229e5c4..bc024f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java @@ -478,7 +478,7 @@ public class ZookeeperClient implements Watcher { * @throws InterruptedException If interrupted. */ void setData(String path, byte[] data, int ver) - throws ZookeeperClientFailedException, InterruptedException + throws ZookeeperClientFailedException, InterruptedException, KeeperException.NoNodeException { if (data == null) data = EMPTY_BYTES; @@ -491,6 +491,9 @@ public class ZookeeperClient implements Watcher { return; } + catch (KeeperException.NoNodeException e) { + throw e; + } catch (Exception e) { onZookeeperError(connStartTime, e); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e909027f/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index ef67ec4..6c9d53a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; @@ -44,17 +45,21 @@ import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.MarshallerUtils; import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.spi.IgniteSpiException; -import org.apache.ignite.spi.IgniteSpiThread; +import org.apache.ignite.spi.IgniteSpiTimeoutObject; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange; import org.apache.ignite.spi.discovery.DiscoverySpiListener; +import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi; +import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -77,6 +82,9 @@ public class ZookeeperDiscoveryImpl { static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD = "IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD"; /** */ + private final ZookeeperDiscoverySpi spi; + + /** */ private final String igniteInstanceName; /** */ @@ -128,7 +136,10 @@ public class ZookeeperDiscoveryImpl { private final int evtsAckThreshold; /** */ - private ZkRuntimeState state; + private IgniteThreadPoolExecutor utilityPool; + + /** */ + private ZkRuntimeState rtState; /** */ private volatile ConnectionState connState = ConnectionState.STARTED; @@ -150,15 +161,13 @@ public class ZookeeperDiscoveryImpl { * @param exchange Discovery data exchange. */ public ZookeeperDiscoveryImpl( + ZookeeperDiscoverySpi spi, String igniteInstanceName, - String connectString, - int sesTimeout, IgniteLogger log, String zkRootPath, ZookeeperClusterNode locNode, DiscoverySpiListener lsnr, - DiscoverySpiDataExchange exchange, - boolean clientReconnectEnabled) { + DiscoverySpiDataExchange exchange) { assert locNode.id() != null && locNode.isLocal() : locNode; MarshallerUtils.setNodeName(marsh, igniteInstanceName); @@ -167,14 +176,15 @@ public class ZookeeperDiscoveryImpl { zkPaths = new ZkIgnitePaths(zkRootPath); + this.spi = spi; this.igniteInstanceName = igniteInstanceName; - this.connectString = connectString; - this.sesTimeout = sesTimeout; + this.connectString = spi.getZkConnectionString(); + this.sesTimeout = spi.getSessionTimeout(); this.log = log.getLogger(getClass()); this.locNode = locNode; this.lsnr = lsnr; this.exchange = exchange; - this.clientReconnectEnabled = clientReconnectEnabled; + this.clientReconnectEnabled = locNode.isClient() && !spi.isClientReconnectDisabled(); watcher = new ZkWatcher(); childrenCallback = new ZKChildrenCallback(); @@ -209,7 +219,7 @@ public class ZookeeperDiscoveryImpl { @Nullable public ClusterNode node(UUID nodeId) { assert nodeId != null; - return state.top.nodesById.get(nodeId); + return rtState.top.nodesById.get(nodeId); } /** @@ -228,7 +238,7 @@ public class ZookeeperDiscoveryImpl { * @param warning Warning. */ public void failNode(UUID nodeId, @Nullable String warning) { - ZookeeperClusterNode node = state.top.nodesById.get(nodeId); + ZookeeperClusterNode node = rtState.top.nodesById.get(nodeId); if (node == null) { if (log.isDebugEnabled()) @@ -259,13 +269,13 @@ public class ZookeeperDiscoveryImpl { return; } - state.zkClient.onCloseStart(); + rtState.zkClient.onCloseStart(); busyLock.block(); busyLock.unblock(); - state.zkClient.close(); + rtState.zkClient.close(); UUID newId = UUID.randomUUID(); @@ -274,20 +284,20 @@ public class ZookeeperDiscoveryImpl { ", prevId=" + locNode.id() + ", locNode=" + locNode + ']'); - new ReconnectorThread(newId).start(); + runInWorkerThread(new ReconnectClosure(newId)); } /** * @param newId New ID. */ private void doReconnect(UUID newId) { - if (state.joined) { - assert state.evtsData != null; + if (rtState.joined) { + assert rtState.evtsData != null; lsnr.onDiscovery(EVT_CLIENT_NODE_DISCONNECTED, - state.evtsData.topVer, + rtState.evtsData.topVer, locNode, - state.top.topologySnapshot(), + rtState.top.topologySnapshot(), Collections.<Long, Collection<ClusterNode>>emptyMap(), null); } @@ -295,7 +305,7 @@ public class ZookeeperDiscoveryImpl { try { locNode.onClientDisconnected(newId); - joinTopology0(state.joined); + joinTopology0(rtState.joined); } catch (Exception e) { U.error(log, "Failed to reconnect: " + e, e); @@ -309,7 +319,7 @@ public class ZookeeperDiscoveryImpl { * @param e Error. */ private void onSegmented(Exception e) { - if (state.joined) { + if (rtState.joined) { synchronized (stateMux) { connState = ConnectionState.STOPPED; } @@ -329,13 +339,16 @@ public class ZookeeperDiscoveryImpl { } } + /** + * + */ private void notifySegmented() { - assert state.evtsData != null; + assert rtState.evtsData != null; lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED, - state.evtsData.topVer, + rtState.evtsData.topVer, locNode, - state.top.topologySnapshot(), + rtState.top.topologySnapshot(), Collections.<Long, Collection<ClusterNode>>emptyMap(), null); } @@ -346,7 +359,7 @@ public class ZookeeperDiscoveryImpl { public Collection<ClusterNode> remoteNodes() { checkState(); - return state.top.remoteNodes(); + return rtState.top.remoteNodes(); } /** @@ -374,7 +387,7 @@ public class ZookeeperDiscoveryImpl { checkState(); try { - List<String> children = state.zkClient.getChildren(zkPaths.aliveNodesDir); + List<String> children = rtState.zkClient.getChildren(zkPaths.aliveNodesDir); for (int i = 0; i < children.size(); i++) { UUID id = ZkIgnitePaths.aliveNodeId(children.get(i)); @@ -410,7 +423,7 @@ public class ZookeeperDiscoveryImpl { byte[] msgBytes; try { - msgBytes = U.zip(marshal(msg)); + msgBytes = marshalZip(msg); } catch (IgniteCheckedException e) { throw new IgniteSpiException("Failed to marshal custom message: " + msg, e); @@ -422,7 +435,7 @@ public class ZookeeperDiscoveryImpl { try { String prefix = UUID.randomUUID().toString(); - state.zkClient.createSequential(prefix, + rtState.zkClient.createSequential(prefix, zkPaths.customEvtsDir, prefix + ":" + locNode.id() + '|', msgBytes, @@ -448,7 +461,7 @@ public class ZookeeperDiscoveryImpl { * @return Cluster start time. */ public long gridStartTime() { - return state.gridStartTime; + return rtState.gridStartTime; } /** @@ -473,6 +486,8 @@ public class ZookeeperDiscoveryImpl { } /** + * @param prevJoined {@code True} if reconnect after already joined topology + * in this case (need produce EVT_CLIENT_NODE_RECONNECTED event). * @throws InterruptedException If interrupted. */ private void joinTopology0(boolean prevJoined) throws InterruptedException { @@ -481,7 +496,7 @@ public class ZookeeperDiscoveryImpl { if (internalLsnr != null) internalLsnr.beforeJoin(log); - state = new ZkRuntimeState(prevJoined); + rtState = new ZkRuntimeState(prevJoined); DiscoveryDataBag discoDataBag = new DiscoveryDataBag(locNode.id()); @@ -492,14 +507,14 @@ public class ZookeeperDiscoveryImpl { byte[] joinDataBytes; try { - joinDataBytes = U.zip(marshal(joinData)); + joinDataBytes = marshalZip(joinData); } catch (Exception e) { throw new IgniteSpiException("Failed to marshal joining node data", e); } try { - state.zkClient = new ZookeeperClient(igniteInstanceName, + rtState.zkClient = new ZookeeperClient(igniteInstanceName, log, connectString, sesTimeout, @@ -517,7 +532,7 @@ public class ZookeeperDiscoveryImpl { */ private void initZkNodes() throws InterruptedException { try { - if (state.zkClient.exists(zkPaths.aliveNodesDir)) + if (rtState.zkClient.exists(zkPaths.aliveNodesDir)) return; // This path is created last, assume all others dirs are created. List<String> dirs = new ArrayList<>(); @@ -532,14 +547,14 @@ public class ZookeeperDiscoveryImpl { dirs.add(zkPaths.aliveNodesDir); try { - state.zkClient.createAll(dirs, PERSISTENT); + rtState.zkClient.createAll(dirs, PERSISTENT); } catch (KeeperException.NodeExistsException e) { if (log.isDebugEnabled()) log.debug("Failed to create nodes using bulk operation: " + e); for (String dir : dirs) - state.zkClient.createIfNeeded(dir, null, PERSISTENT); + rtState.zkClient.createIfNeeded(dir, null, PERSISTENT); } } catch (ZookeeperClientFailedException e) { @@ -562,15 +577,18 @@ public class ZookeeperDiscoveryImpl { // TODO ZK: handle max size. - String path = state.zkClient.createSequential(prefix, + final ZkRuntimeState rtState = this.rtState; + + String joinDataPath = rtState.zkClient.createSequential(prefix, zkPaths.joinDataDir, prefix + ":" + locNode.id() + "|", joinDataBytes, EPHEMERAL_SEQUENTIAL); - int seqNum = Integer.parseInt(path.substring(path.lastIndexOf('|') + 1)); + // TODO ZK: no need to use sequential + int seqNum = Integer.parseInt(joinDataPath.substring(joinDataPath.lastIndexOf('|') + 1)); - state.locNodeZkPath = state.zkClient.createSequential( + rtState.locNodeZkPath = rtState.zkClient.createSequential( prefix, zkPaths.aliveNodesDir, prefix + ":" + locNode.id() + "|" + seqNum + "|", @@ -580,11 +598,24 @@ public class ZookeeperDiscoveryImpl { log.info("Node started join [nodeId=" + locNode.id() + ", instanceName=" + locNode.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME) + ", joinDataSize=" + joinDataBytes.length + - ", nodePath=" + state.locNodeZkPath + ']'); + ", nodePath=" + rtState.locNodeZkPath + ']'); - state.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback()); + rtState.internalOrder = ZkIgnitePaths.aliveInternalId(rtState.locNodeZkPath); - state.zkClient.getDataAsync(zkPaths.evtsPath, watcher, dataCallback); + /* + If node can not join due to some validation error this error is reported in join data, + As a minor optimization do not start watch this immediately, but only if do not receive + join event after timeout. + */ + rtState.joinTimeoutObj = new CheckJoinStateTimeoutObject( + joinDataPath, + rtState); + + spi.getSpiContext().addTimeoutObject(rtState.joinTimeoutObj); + + rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback()); + + rtState.zkClient.getDataAsync(zkPaths.evtsPath, watcher, dataCallback); } catch (ZookeeperClientFailedException e) { throw new IgniteSpiException("Failed to initialize Zookeeper nodes", e); @@ -596,6 +627,95 @@ public class ZookeeperDiscoveryImpl { } } + /** + * + */ + private class CheckJoinStateTimeoutObject implements IgniteSpiTimeoutObject, Watcher, AsyncCallback.DataCallback { + /** */ + private final IgniteUuid id = IgniteUuid.randomUuid(); + + /** */ + private final long endTime = System.currentTimeMillis() + 5000; + + /** */ + private final String joinDataPath; + + /** */ + private final ZkRuntimeState rtState; + + /** + * @param joinDataPath Node joined data path. + * @param rtState State. + */ + CheckJoinStateTimeoutObject(String joinDataPath, ZkRuntimeState rtState) { + this.joinDataPath = joinDataPath; + this.rtState = rtState; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return id; + } + + /** {@inheritDoc} */ + @Override public long endTime() { + return endTime; + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + if (rtState.joined) + return; + + synchronized (stateMux) { + if (connState != ConnectionState.STARTED) + return; + } + + rtState.zkClient.getDataAsync(joinDataPath, this, this); + } + + /** {@inheritDoc} */ + @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + if (rc != 0) + return; + + if (!busyLock.enterBusy()) + return; + + try { + Object obj = unmarshalZip(data); + + if (obj instanceof ZkInternalJoinErrorMessage) { + ZkInternalJoinErrorMessage joinErr = (ZkInternalJoinErrorMessage)obj; + + onSegmented(new IgniteSpiException(joinErr.err)); + } + + busyLock.leaveBusy(); + } + catch (Throwable e) { + onFatalError(busyLock, e); + } + } + + /** {@inheritDoc} */ + @Override public void process(WatchedEvent evt) { + if (!busyLock.enterBusy()) + return; + + try { + if (evt.getType() == Event.EventType.NodeDataChanged) + rtState.zkClient.getDataAsync(evt.getPath(), this, this); + + busyLock.leaveBusy(); + } + catch (Throwable e) { + onFatalError(busyLock, e); + } + } + } + /** TODO ZK */ private final CountDownLatch connStartLatch = new CountDownLatch(1); @@ -618,7 +738,7 @@ public class ZookeeperDiscoveryImpl { TreeMap<Integer, String> alives = new TreeMap<>(); - int locInternalId = ZkIgnitePaths.aliveInternalId(state.locNodeZkPath); + int locInternalId = ZkIgnitePaths.aliveInternalId(rtState.locNodeZkPath); for (String aliveNodePath : aliveNodes) { Integer internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath); @@ -645,7 +765,7 @@ public class ZookeeperDiscoveryImpl { PreviousNodeWatcher watcher = new PreviousNodeWatcher(); - state.zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + prevE.getValue(), watcher, watcher); + rtState.zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + prevE.getValue(), watcher, watcher); } } @@ -660,7 +780,7 @@ public class ZookeeperDiscoveryImpl { if (log.isInfoEnabled()) log.info("Previous node failed, check is node new coordinator [locId=" + locNode.id() + ']'); - state.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback()); + rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback()); } /** @@ -669,22 +789,22 @@ public class ZookeeperDiscoveryImpl { * @throws Exception If failed. */ private void onBecomeCoordinator(List<String> aliveNodes, int locInternalId) throws Exception { - byte[] evtsDataBytes = state.zkClient.getData(zkPaths.evtsPath); + byte[] evtsDataBytes = rtState.zkClient.getData(zkPaths.evtsPath); if (evtsDataBytes.length > 0) processNewEvents(evtsDataBytes); - state.crd = true; + rtState.crd = true; - if (state.joined) { + if (rtState.joined) { if (log.isInfoEnabled()) log.info("Node is new discovery coordinator [locId=" + locNode.id() + ']'); assert locNode.order() > 0 : locNode; - assert state.evtsData != null; + assert rtState.evtsData != null; - for (ZkDiscoveryEventData evtData : state.evtsData.evts.values()) - evtData.initRemainingAcks(state.top.nodesByOrder.values()); + for (ZkDiscoveryEventData evtData : rtState.evtsData.evts.values()) + evtData.initRemainingAcks(rtState.top.nodesByOrder.values()); handleProcessedEvents("crd"); } @@ -695,8 +815,8 @@ public class ZookeeperDiscoveryImpl { newClusterStarted(locInternalId); } - state.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, watcher, childrenCallback); - state.zkClient.getChildrenAsync(zkPaths.customEvtsDir, watcher, childrenCallback); + rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, watcher, childrenCallback); + rtState.zkClient.getChildrenAsync(zkPaths.customEvtsDir, watcher, childrenCallback); for (String alivePath : aliveNodes) watchAliveNodeData(alivePath); @@ -706,12 +826,12 @@ public class ZookeeperDiscoveryImpl { * @param alivePath */ private void watchAliveNodeData(String alivePath) { - assert state.locNodeZkPath != null; + assert rtState.locNodeZkPath != null; String path = zkPaths.aliveNodesDir + "/" + alivePath; - if (!path.equals(state.locNodeZkPath)) - state.zkClient.getDataAsync(path, aliveNodeDataWatcher, aliveNodeDataWatcher); + if (!path.equals(rtState.locNodeZkPath)) + rtState.zkClient.getDataAsync(path, aliveNodeDataWatcher, aliveNodeDataWatcher); } /** @@ -719,14 +839,14 @@ public class ZookeeperDiscoveryImpl { * @throws Exception If failed. */ private void generateTopologyEvents(List<String> aliveNodes) throws Exception { - assert state.crd; + assert rtState.crd; if (log.isInfoEnabled()) log.info("Process alive nodes change: " + aliveNodes.size()); TreeMap<Integer, String> alives = new TreeMap<>(); - TreeMap<Long, ZookeeperClusterNode> curTop = new TreeMap<>(state.top.nodesByOrder); + TreeMap<Long, ZookeeperClusterNode> curTop = new TreeMap<>(rtState.top.nodesByOrder); boolean newEvts = false; @@ -737,16 +857,15 @@ public class ZookeeperDiscoveryImpl { assert old == null; - if (!state.top.nodesByInternalId.containsKey(internalId)) { - processJoinOnCoordinator(curTop, internalId, child); - - newEvts = true; + if (!rtState.top.nodesByInternalId.containsKey(internalId)) { + if (processJoinOnCoordinator(curTop, internalId, child)) + newEvts = true; } } List<ZookeeperClusterNode> failedNodes = null; - for (Map.Entry<Integer, ZookeeperClusterNode> e : state.top.nodesByInternalId.entrySet()) { + for (Map.Entry<Integer, ZookeeperClusterNode> e : rtState.top.nodesByInternalId.entrySet()) { if (!alives.containsKey(e.getKey())) { ZookeeperClusterNode failedNode = e.getValue(); @@ -774,7 +893,7 @@ public class ZookeeperDiscoveryImpl { * @param aliveNodePath Joined node path. * @throws Exception If failed. */ - private void processJoinOnCoordinator(TreeMap<Long, ZookeeperClusterNode> curTop, + private boolean processJoinOnCoordinator(TreeMap<Long, ZookeeperClusterNode> curTop, int internalId, String aliveNodePath) throws Exception { UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath); @@ -783,20 +902,29 @@ public class ZookeeperDiscoveryImpl { byte[] joinData; try { - joinData = state.zkClient.getData(joinDataPath); + joinData = rtState.zkClient.getData(joinDataPath); } catch (KeeperException.NoNodeException e) { U.warn(log, "Failed to read joining node data, node left before join process finished: " + nodeId); - return; + return false; } String err = null; - ZkJoiningNodeData joiningNodeData = null; + Object dataObj = null; try { - joiningNodeData = unmarshalZip(joinData); + dataObj = unmarshalZip(joinData); + + if (dataObj instanceof ZkInternalJoinErrorMessage) { + if (log.isInfoEnabled()) + log.info("Ignore join data, node was failed by previous coordinator: " + aliveNodePath); + + zkClient().deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1); + + return false; + } } catch (Exception e) { U.error(log, "Failed to unmarshal joining node data [nodePath=" + aliveNodePath + "']", e); @@ -804,11 +932,12 @@ public class ZookeeperDiscoveryImpl { err = "Failed to unmarshal join data: " + e; } - if (err == null) { - assert joiningNodeData != null; + assert dataObj instanceof ZkJoiningNodeData : dataObj; + ZkJoiningNodeData joiningNodeData = (ZkJoiningNodeData)dataObj; + + if (err == null) err = validateJoiningNode(joiningNodeData.node()); - } if (err == null) { ZookeeperClusterNode joinedNode = joiningNodeData.node(); @@ -818,20 +947,31 @@ public class ZookeeperDiscoveryImpl { generateNodeJoin(curTop, joinData, joiningNodeData, internalId); watchAliveNodeData(aliveNodePath); + + return true; } else { ZkInternalJoinErrorMessage msg = new ZkInternalJoinErrorMessage(internalId, err); - // IgniteNodeValidationResult err = spi.getSpiContext().validateNode(node); + try { + zkClient().setData(joinDataPath, marshalZip(msg), -1); + } + catch (KeeperException.NoNodeException e) { + // Ignore, node already failed. + } + + zkClient().deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1); + + return false; } } /** * @param node Joining node. - * @return + * @return Non null error message if validation failed. */ @Nullable private String validateJoiningNode(ZookeeperClusterNode node) { - ZookeeperClusterNode node0 = state.top.nodesById.get(node.id()); + ZookeeperClusterNode node0 = rtState.top.nodesById.get(node.id()); if (node0 != null) { U.error(log, "Failed to include node in cluster, node with the same ID already exists [joiningNode=" + node + @@ -849,21 +989,21 @@ public class ZookeeperDiscoveryImpl { private void saveAndProcessNewEvents() throws Exception { long start = System.currentTimeMillis(); - byte[] evtsBytes = U.zip(marshal(state.evtsData)); + byte[] evtsBytes = marshalZip(rtState.evtsData); - state.zkClient.setData(zkPaths.evtsPath, evtsBytes, -1); + rtState.zkClient.setData(zkPaths.evtsPath, evtsBytes, -1); long time = System.currentTimeMillis() - start; if (log.isInfoEnabled()) { - log.info("Discovery coordinator saved new topology events [topVer=" + state.evtsData.topVer + + log.info("Discovery coordinator saved new topology events [topVer=" + rtState.evtsData.topVer + ", size=" + evtsBytes.length + - ", evts=" + state.evtsData.evts.size() + - ", lastEvt=" + state.evtsData.evtIdGen + + ", evts=" + rtState.evtsData.evts.size() + + ", lastEvt=" + rtState.evtsData.evtIdGen + ", saveTime=" + time + ']'); } - processNewEvents(state.evtsData); + processNewEvents(rtState.evtsData); } /** @@ -875,15 +1015,15 @@ public class ZookeeperDiscoveryImpl { assert rmvd != null; - state.evtsData.topVer++; - state.evtsData.evtIdGen++; + rtState.evtsData.topVer++; + rtState.evtsData.evtIdGen++; ZkDiscoveryNodeFailEventData evtData = new ZkDiscoveryNodeFailEventData( - state.evtsData.evtIdGen, - state.evtsData.topVer, + rtState.evtsData.evtIdGen, + rtState.evtsData.topVer, failedNode.internalId()); - state.evtsData.addEvent(curTop.values(), evtData); + rtState.evtsData.addEvent(curTop.values(), evtData); if (log.isInfoEnabled()) log.info("Generated NODE_FAILED event [evt=" + evtData + ']'); @@ -905,10 +1045,10 @@ public class ZookeeperDiscoveryImpl { UUID nodeId = joinedNode.id(); - state.evtsData.topVer++; - state.evtsData.evtIdGen++; + rtState.evtsData.topVer++; + rtState.evtsData.evtIdGen++; - joinedNode.order(state.evtsData.topVer); + joinedNode.order(rtState.evtsData.topVer); joinedNode.internalId(internalId); DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(nodeId); @@ -932,23 +1072,23 @@ public class ZookeeperDiscoveryImpl { assert old == null; ZkDiscoveryNodeJoinEventData evtData = new ZkDiscoveryNodeJoinEventData( - state.evtsData.evtIdGen, - state.evtsData.topVer, + rtState.evtsData.evtIdGen, + rtState.evtsData.topVer, joinedNode.id(), joinedNode.internalId()); evtData.joiningNodeData = joiningNodeData; - state.evtsData.addEvent(dataForJoined.topology(), evtData); + rtState.evtsData.addEvent(dataForJoined.topology(), evtData); evtData.addRemainingAck(joinedNode); // Topology for joined node does not contain joined node. - byte[] dataForJoinedBytes = U.zip(marshal(dataForJoined)); + byte[] dataForJoinedBytes = marshalZip(dataForJoined); long start = System.currentTimeMillis(); - state.zkClient.createIfNeeded(zkPaths.joinEventDataPath(evtData.eventId()), joinData, PERSISTENT); - state.zkClient.createIfNeeded(zkPaths.joinEventDataPathForJoined(evtData.eventId()), dataForJoinedBytes, PERSISTENT); + rtState.zkClient.createIfNeeded(zkPaths.joinEventDataPath(evtData.eventId()), joinData, PERSISTENT); + rtState.zkClient.createIfNeeded(zkPaths.joinEventDataPathForJoined(evtData.eventId()), dataForJoinedBytes, PERSISTENT); long time = System.currentTimeMillis() - start; @@ -966,27 +1106,29 @@ public class ZookeeperDiscoveryImpl { */ @SuppressWarnings("unchecked") private void newClusterStarted(int locInternalId) throws Exception { + spi.getSpiContext().removeTimeoutObject(rtState.joinTimeoutObj); + cleanupPreviousClusterData(); - state.joined = true; + rtState.joined = true; - state.gridStartTime = U.currentTimeMillis(); + rtState.gridStartTime = U.currentTimeMillis(); - state.evtsData = new ZkDiscoveryEventsData(state.gridStartTime, 1L, new TreeMap<Long, ZkDiscoveryEventData>()); + rtState.evtsData = new ZkDiscoveryEventsData(rtState.gridStartTime, 1L, new TreeMap<Long, ZkDiscoveryEventData>()); locNode.internalId(locInternalId); locNode.order(1); - state.top.addNode(locNode); + rtState.top.addNode(locNode); - String path = state.locNodeZkPath.substring(state.locNodeZkPath.lastIndexOf('/') + 1); + String path = rtState.locNodeZkPath.substring(rtState.locNodeZkPath.lastIndexOf('/') + 1); String joinDataPath = zkPaths.joiningNodeDataPath(locNode.id(), path); if (log.isDebugEnabled()) log.debug("Delete join data: " + joinDataPath); - state.zkClient.deleteIfExistsAsync(joinDataPath); + rtState.zkClient.deleteIfExistsAsync(joinDataPath); final List<ClusterNode> topSnapshot = Collections.singletonList((ClusterNode)locNode); @@ -1000,7 +1142,7 @@ public class ZookeeperDiscoveryImpl { Collections.<Long, Collection<ClusterNode>>emptyMap(), null); - if (state.prevJoined) { + if (rtState.prevJoined) { lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED, 1L, locNode, @@ -1021,9 +1163,9 @@ public class ZookeeperDiscoveryImpl { long start = System.currentTimeMillis(); // TODO ZK: use multi, better batching. - state.zkClient.setData(zkPaths.evtsPath, null, -1); + rtState.zkClient.setData(zkPaths.evtsPath, null, -1); - List<String> evtChildren = state.zkClient.getChildren(zkPaths.evtsPath); + List<String> evtChildren = rtState.zkClient.getChildren(zkPaths.evtsPath); for (String evtPath : evtChildren) { String evtDir = zkPaths.evtsPath + "/" + evtPath; @@ -1031,14 +1173,14 @@ public class ZookeeperDiscoveryImpl { removeChildren(evtDir); } - state.zkClient.deleteAll(zkPaths.evtsPath, evtChildren, -1); + rtState.zkClient.deleteAll(zkPaths.evtsPath, evtChildren, -1); - state.zkClient.deleteAll(zkPaths.customEvtsDir, - state.zkClient.getChildren(zkPaths.customEvtsDir), + rtState.zkClient.deleteAll(zkPaths.customEvtsDir, + rtState.zkClient.getChildren(zkPaths.customEvtsDir), -1); - state.zkClient.deleteAll(zkPaths.customEvtsAcksDir, - state.zkClient.getChildren(zkPaths.customEvtsAcksDir), + rtState.zkClient.deleteAll(zkPaths.customEvtsAcksDir, + rtState.zkClient.getChildren(zkPaths.customEvtsAcksDir), -1); long time = System.currentTimeMillis() - start; @@ -1054,15 +1196,15 @@ public class ZookeeperDiscoveryImpl { * @throws Exception If failed. */ private void removeChildren(String path) throws Exception { - state.zkClient.deleteAll(path, state.zkClient.getChildren(path), -1); + rtState.zkClient.deleteAll(path, rtState.zkClient.getChildren(path), -1); } ZkClusterNodes nodes() { - return state.top; + return rtState.top; } ZookeeperClient zkClient() { - return state.zkClient; + return rtState.zkClient; } /** @@ -1070,7 +1212,7 @@ public class ZookeeperDiscoveryImpl { * @throws Exception If failed. */ private void generateCustomEvents(List<String> customEvtNodes) throws Exception { - assert state.crd; + assert rtState.crd; TreeMap<Integer, String> newEvts = null; @@ -1079,7 +1221,7 @@ public class ZookeeperDiscoveryImpl { int evtSeq = ZkIgnitePaths.customEventSequence(evtPath); - if (evtSeq > state.evtsData.procCustEvt) { + if (evtSeq > rtState.evtsData.procCustEvt) { if (newEvts == null) newEvts = new TreeMap<>(); @@ -1093,7 +1235,7 @@ public class ZookeeperDiscoveryImpl { for (Map.Entry<Integer, String> evtE : newEvts.entrySet()) { UUID sndNodeId = ZkIgnitePaths.customEventSendNodeId(evtE.getValue()); - ZookeeperClusterNode sndNode = state.top.nodesById.get(sndNodeId); + ZookeeperClusterNode sndNode = rtState.top.nodesById.get(sndNodeId); if (alives != null && !alives.contains(sndNode.id())) sndNode = null; @@ -1101,27 +1243,27 @@ public class ZookeeperDiscoveryImpl { String evtDataPath = zkPaths.customEvtsDir + "/" + evtE.getValue(); if (sndNode != null) { - byte[] evtBytes = state.zkClient.getData(zkPaths.customEvtsDir + "/" + evtE.getValue()); + byte[] evtBytes = rtState.zkClient.getData(zkPaths.customEvtsDir + "/" + evtE.getValue()); DiscoverySpiCustomMessage msg; try { msg = unmarshalZip(evtBytes); - state.evtsData.evtIdGen++; + rtState.evtsData.evtIdGen++; if (msg instanceof ZkInternalForceNodeFailMessage) { ZkInternalForceNodeFailMessage msg0 = (ZkInternalForceNodeFailMessage)msg; if (alives == null) - alives = new HashSet<>(state.top.nodesById.keySet()); + alives = new HashSet<>(rtState.top.nodesById.keySet()); if (alives.contains(msg0.nodeId)) { - state.evtsData.topVer++; + rtState.evtsData.topVer++; alives.remove(msg0.nodeId); - ZookeeperClusterNode node = state.top.nodesById.get(msg0.nodeId); + ZookeeperClusterNode node = rtState.top.nodesById.get(msg0.nodeId); assert node != null : msg0.nodeId; @@ -1140,15 +1282,15 @@ public class ZookeeperDiscoveryImpl { } ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData( - state.evtsData.evtIdGen, - state.evtsData.topVer, + rtState.evtsData.evtIdGen, + rtState.evtsData.topVer, sndNodeId, evtE.getValue(), false); evtData.msg = msg; - state.evtsData.addEvent(state.top.nodesByOrder.values(), evtData); + rtState.evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData); if (log.isDebugEnabled()) log.debug("Generated CUSTOM event [evt=" + evtData + ", msg=" + msg + ']'); @@ -1160,10 +1302,10 @@ public class ZookeeperDiscoveryImpl { else { U.warn(log, "Ignore custom event from unknown node: " + sndNodeId); - state.zkClient.deleteIfExistsAsync(evtDataPath); + rtState.zkClient.deleteIfExistsAsync(evtDataPath); } - state.evtsData.procCustEvt = evtE.getKey(); + rtState.evtsData.procCustEvt = evtE.getKey(); } saveAndProcessNewEvents(); @@ -1178,13 +1320,13 @@ public class ZookeeperDiscoveryImpl { if (data.length == 0) return; - assert !state.crd; + assert !rtState.crd; ZkDiscoveryEventsData newEvts = unmarshalZip(data); // Need keep processed custom events since they contains message object. - if (state.evtsData != null) { - for (Map.Entry<Long, ZkDiscoveryEventData> e : state.evtsData.evts.entrySet()) { + if (rtState.evtsData != null) { + for (Map.Entry<Long, ZkDiscoveryEventData> e : rtState.evtsData.evts.entrySet()) { ZkDiscoveryEventData evtData = e.getValue(); if (evtData.eventType() == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { @@ -1199,7 +1341,7 @@ public class ZookeeperDiscoveryImpl { processNewEvents(newEvts); - state.evtsData = newEvts; + rtState.evtsData = newEvts; } /** @@ -1212,8 +1354,8 @@ public class ZookeeperDiscoveryImpl { boolean updateNodeInfo = false; - for (ZkDiscoveryEventData evtData : evts.tailMap(state.locNodeInfo.lastProcEvt, false).values()) { - if (!state.joined) { + for (ZkDiscoveryEventData evtData : evts.tailMap(rtState.locNodeInfo.lastProcEvt, false).values()) { + if (!rtState.joined) { if (evtData.eventType() != EventType.EVT_NODE_JOINED) continue; @@ -1222,10 +1364,13 @@ public class ZookeeperDiscoveryImpl { UUID joinedId = evtData0.nodeId; boolean locJoin = evtData.eventType() == EventType.EVT_NODE_JOINED && - locNode.id().equals(joinedId); + evtData0.joinedInternalId == rtState.internalOrder; + + if (locJoin) { + assert locNode.id().equals(joinedId); - if (locJoin) processLocalJoin(evtsData, evtData0); + } } else { if (log.isDebugEnabled()) @@ -1237,7 +1382,7 @@ public class ZookeeperDiscoveryImpl { ZkJoiningNodeData joiningData; - if (state.crd) { + if (rtState.crd) { assert evtData0.joiningNodeData != null; joiningData = evtData0.joiningNodeData; @@ -1245,7 +1390,7 @@ public class ZookeeperDiscoveryImpl { else { String path = zkPaths.joinEventDataPath(evtData.eventId()); - joiningData = unmarshalZip(state.zkClient.getData(path)); + joiningData = unmarshalZip(rtState.zkClient.getData(path)); DiscoveryDataBag dataBag = new DiscoveryDataBag(evtData0.nodeId); @@ -1273,7 +1418,7 @@ public class ZookeeperDiscoveryImpl { DiscoverySpiCustomMessage msg; - if (state.crd) { + if (rtState.crd) { assert evtData0.msg != null : evtData0; msg = evtData0.msg; @@ -1286,7 +1431,7 @@ public class ZookeeperDiscoveryImpl { else path = zkPaths.customEventDataPath(false, evtData0.evtPath); - msg = unmarshalZip(state.zkClient.getData(path)); + msg = unmarshalZip(rtState.zkClient.getData(path)); evtData0.msg = msg; } @@ -1308,25 +1453,25 @@ public class ZookeeperDiscoveryImpl { } } - if (state.joined) { - state.locNodeInfo.lastProcEvt = evtData.eventId(); + if (rtState.joined) { + rtState.locNodeInfo.lastProcEvt = evtData.eventId(); - state.procEvtCnt++; + rtState.procEvtCnt++; - if (state.procEvtCnt % evtsAckThreshold == 0) + if (rtState.procEvtCnt % evtsAckThreshold == 0) updateNodeInfo = true; } } - if (state.crd) + if (rtState.crd) handleProcessedEvents("procEvt"); else if (updateNodeInfo) { - assert state.locNodeZkPath != null; + assert rtState.locNodeZkPath != null; if (log.isDebugEnabled()) - log.debug("Update processed events: " + state.locNodeInfo.lastProcEvt); + log.debug("Update processed events: " + rtState.locNodeInfo.lastProcEvt); - state.zkClient.setData(state.locNodeZkPath, marshal(state.locNodeInfo), -1); + rtState.zkClient.setData(rtState.locNodeZkPath, marshalZip(rtState.locNodeInfo), -1); } } @@ -1342,11 +1487,13 @@ public class ZookeeperDiscoveryImpl { if (log.isInfoEnabled()) log.info("Local join event data: " + evtData + ']'); + spi.getSpiContext().removeTimeoutObject(rtState.joinTimeoutObj); + String path = zkPaths.joinEventDataPathForJoined(evtData.eventId()); - ZkJoinEventDataForJoined dataForJoined = unmarshalZip(state.zkClient.getData(path)); + ZkJoinEventDataForJoined dataForJoined = unmarshalZip(rtState.zkClient.getData(path)); - state.gridStartTime = evtsData.gridStartTime; + rtState.gridStartTime = evtsData.gridStartTime; locNode.internalId(evtData.joinedInternalId); locNode.order(evtData.topologyVersion()); @@ -1364,12 +1511,12 @@ public class ZookeeperDiscoveryImpl { node.setMetrics(new ClusterMetricsSnapshot()); - state.top.addNode(node); + rtState.top.addNode(node); } - state.top.addNode(locNode); + rtState.top.addNode(locNode); - final List<ClusterNode> topSnapshot = state.top.topologySnapshot(); + final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); if (connState == ConnectionState.DISCONNECTED) connState = ConnectionState.STARTED; @@ -1381,7 +1528,7 @@ public class ZookeeperDiscoveryImpl { Collections.<Long, Collection<ClusterNode>>emptyMap(), null); - if (state.prevJoined) { + if (rtState.prevJoined) { lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED, evtData.topologyVersion(), locNode, @@ -1394,12 +1541,12 @@ public class ZookeeperDiscoveryImpl { joinFut.onDone(); - state.joined = true; + rtState.joined = true; if (log.isDebugEnabled()) log.debug("Delete data for joined: " + path); - state.zkClient.deleteIfExistsAsync(path); + rtState.zkClient.deleteIfExistsAsync(path); } /** @@ -1410,7 +1557,7 @@ public class ZookeeperDiscoveryImpl { if (msg instanceof ZkInternalForceNodeFailMessage) { ZkInternalForceNodeFailMessage msg0 = (ZkInternalForceNodeFailMessage)msg; - ClusterNode creatorNode = state.top.nodesById.get(evtData.sndNodeId); + ClusterNode creatorNode = rtState.top.nodesById.get(evtData.sndNodeId); if (msg0.warning != null) { U.warn(log, "Received EVT_NODE_FAILED event with warning [" + @@ -1424,7 +1571,7 @@ public class ZookeeperDiscoveryImpl { ", nodeId=" + msg0.nodeId + ']'); } - ZookeeperClusterNode node = state.top.nodesById.get(msg0.nodeId); + ZookeeperClusterNode node = rtState.top.nodesById.get(msg0.nodeId); assert node != null : msg0.nodeId; @@ -1452,11 +1599,11 @@ public class ZookeeperDiscoveryImpl { if (log.isDebugEnabled()) log.debug(" [topVer=" + evtData.topologyVersion() + ", msg=" + msg + ']'); - final ZookeeperClusterNode sndNode = state.top.nodesById.get(evtData.sndNodeId); + final ZookeeperClusterNode sndNode = rtState.top.nodesById.get(evtData.sndNodeId); assert sndNode != null : evtData; - final List<ClusterNode> topSnapshot = state.top.topologySnapshot(); + final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); lsnr.onDiscovery(evtData.eventType(), evtData.topologyVersion(), @@ -1479,9 +1626,9 @@ public class ZookeeperDiscoveryImpl { joinedNode.setMetrics(new ClusterMetricsSnapshot()); - state.top.addNode(joinedNode); + rtState.top.addNode(joinedNode); - final List<ClusterNode> topSnapshot = state.top.topologySnapshot(); + final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); lsnr.onDiscovery(evtData.eventType(), evtData.topologyVersion(), @@ -1493,18 +1640,19 @@ public class ZookeeperDiscoveryImpl { /** * @param evtData Event data. + * @throws Exception If failed. */ private void notifyNodeFail(final ZkDiscoveryNodeFailEventData evtData) throws Exception { processNodeFail(evtData.failedNodeInternalId(), evtData.topologyVersion()); } /** - * @param nodeInternalId - * @param topVer - * @throws Exception + * @param nodeInternalId Failed node internal ID. + * @param topVer Topology version. + * @throws Exception If failed. */ private void processNodeFail(int nodeInternalId, long topVer) throws Exception { - final ZookeeperClusterNode failedNode = state.top.removeNode(nodeInternalId); + final ZookeeperClusterNode failedNode = rtState.top.removeNode(nodeInternalId); assert failedNode != null; @@ -1532,7 +1680,7 @@ public class ZookeeperDiscoveryImpl { ", prevId=" + locNode.id() + ", locNode=" + locNode + ']'); - new ReconnectorThread(newId).start(); + runInWorkerThread(new ReconnectClosure(newId)); } } else @@ -1542,7 +1690,7 @@ public class ZookeeperDiscoveryImpl { throw new ZookeeperClientFailedException("Received node failed event for local node."); } else { - final List<ClusterNode> topSnapshot = state.top.topologySnapshot(); + final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); lsnr.onDiscovery(EVT_NODE_FAILED, topVer, @@ -1558,7 +1706,7 @@ public class ZookeeperDiscoveryImpl { * @throws Exception If failed. */ private void handleProcessedEvents(String ctx) throws Exception { - Iterator<ZkDiscoveryEventData> it = state.evtsData.evts.values().iterator(); + Iterator<ZkDiscoveryEventData> it = rtState.evtsData.evts.values().iterator(); List<ZkDiscoveryCustomEventData> newEvts = null; @@ -1590,11 +1738,11 @@ public class ZookeeperDiscoveryImpl { DiscoverySpiCustomMessage ack = handleProcessedCustomEvent(ctx, (ZkDiscoveryCustomEventData)evtData); if (ack != null) { - state.evtsData.evtIdGen++; + rtState.evtsData.evtIdGen++; - long evtId = state.evtsData.evtIdGen; + long evtId = rtState.evtsData.evtIdGen; - byte[] ackBytes = U.zip(marshal(ack)); + byte[] ackBytes = marshalZip(ack); String path = zkPaths.ackEventDataPath(evtId); @@ -1602,7 +1750,7 @@ public class ZookeeperDiscoveryImpl { log.debug("Create ack event: " + path); // TODO ZK: delete is previous exists? - state.zkClient.createIfNeeded( + rtState.zkClient.createIfNeeded( path, ackBytes, CreateMode.PERSISTENT); @@ -1647,10 +1795,10 @@ public class ZookeeperDiscoveryImpl { } if (newEvts != null) { - Collection<ZookeeperClusterNode> nodes = state.top.nodesByOrder.values(); + Collection<ZookeeperClusterNode> nodes = rtState.top.nodesByOrder.values(); for (int i = 0; i < newEvts.size(); i++) - state.evtsData.addEvent(nodes, newEvts.get(i)); + rtState.evtsData.addEvent(nodes, newEvts.get(i)); saveAndProcessNewEvents(); } @@ -1663,7 +1811,7 @@ public class ZookeeperDiscoveryImpl { private void handleProcessedEventsOnNodesFail(List<ZookeeperClusterNode> failedNodes) throws Exception { boolean processed = false; - for (Iterator<Map.Entry<Long, ZkDiscoveryEventData>> it = state.evtsData.evts.entrySet().iterator(); it.hasNext();) { + for (Iterator<Map.Entry<Long, ZkDiscoveryEventData>> it = rtState.evtsData.evts.entrySet().iterator(); it.hasNext();) { Map.Entry<Long, ZkDiscoveryEventData> e = it.next(); ZkDiscoveryEventData evtData = e.getValue(); @@ -1694,8 +1842,8 @@ public class ZookeeperDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Delete processed event data [path1=" + evtDataPath + ", path2=" + dataForJoinedPath + ']'); - state.zkClient.deleteIfExistsAsync(evtDataPath); - state.zkClient.deleteIfExistsAsync(dataForJoinedPath); + rtState.zkClient.deleteIfExistsAsync(evtDataPath); + rtState.zkClient.deleteIfExistsAsync(dataForJoinedPath); } /** @@ -1715,7 +1863,7 @@ public class ZookeeperDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Delete path: " + path); - state.zkClient.deleteIfExistsAsync(path); + rtState.zkClient.deleteIfExistsAsync(path); assert evtData.msg != null || locNode.order() > evtData.topologyVersion() : evtData; @@ -1728,13 +1876,38 @@ public class ZookeeperDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Delete path: " + path); - state.zkClient.deleteIfExistsAsync(path); + rtState.zkClient.deleteIfExistsAsync(path); } return null; } /** + * @param c Closure to run. + */ + private void runInWorkerThread(Runnable c) { + IgniteThreadPoolExecutor pool; + + synchronized (stateMux) { + if (connState == ConnectionState.STOPPED) + return; + + if (utilityPool == null) { + utilityPool = new IgniteThreadPoolExecutor("zk-discovery-pool", + igniteInstanceName, + 0, + 1, + 2000, + new LinkedBlockingQueue<Runnable>()); + } + + pool = utilityPool; + } + + pool.submit(c); + } + + /** * */ public void onStop() { @@ -1745,7 +1918,7 @@ public class ZookeeperDiscoveryImpl { connState = ConnectionState.STOPPED; } - ZookeeperClient zkClient = state.zkClient; + ZookeeperClient zkClient = rtState.zkClient; if (zkClient != null) zkClient.onCloseStart(); @@ -1769,7 +1942,9 @@ public class ZookeeperDiscoveryImpl { joinFut.onDone(e); - ZookeeperClient zkClient = state.zkClient; + IgniteUtils.shutdownNow(ZookeeperDiscoveryImpl.class, utilityPool, log); + + ZookeeperClient zkClient = rtState.zkClient; if (zkClient != null) zkClient.close(); @@ -1811,12 +1986,6 @@ public class ZookeeperDiscoveryImpl { * @return Unmarshalled object. * @throws IgniteCheckedException If failed. */ - private <T> T unmarshal(byte[] bytes) throws IgniteCheckedException { - assert bytes != null && bytes.length > 0; - - return marsh.unmarshal(bytes, null); - } - private <T> T unmarshalZip(byte[] bytes) throws IgniteCheckedException { assert bytes != null && bytes.length > 0; @@ -1828,29 +1997,30 @@ public class ZookeeperDiscoveryImpl { * @return Bytes. * @throws IgniteCheckedException If failed. */ - private byte[] marshal(Object obj) throws IgniteCheckedException { + private byte[] marshalZip(Object obj) throws IgniteCheckedException { assert obj != null; - return marsh.marshal(obj); + return U.zip(marsh.marshal(obj)); } /** * */ - private class ReconnectorThread extends IgniteSpiThread { + private class ReconnectClosure implements Runnable { /** */ private final UUID newId; /** - * + * @param newId New ID. */ - ReconnectorThread(UUID newId) { - super(ZookeeperDiscoveryImpl.this.igniteInstanceName, "zk-reconnector", log); + ReconnectClosure(UUID newId) { + assert newId != null; this.newId = newId; } - @Override protected void body() throws InterruptedException { + /** {@inheritDoc} */ + @Override public void run() { busyLock.block(); busyLock.unblock(); @@ -1883,7 +2053,7 @@ public class ZookeeperDiscoveryImpl { ", prevId=" + locNode.id() + ", locNode=" + locNode + ']'); - new ReconnectorThread(newId).start(); + runInWorkerThread(new ReconnectClosure(newId)); } else { U.warn(log, "Connection to Zookeeper server is lost, local node SEGMENTED."); @@ -1905,17 +2075,17 @@ public class ZookeeperDiscoveryImpl { try { if (evt.getType() == Event.EventType.NodeDataChanged) { if (evt.getPath().equals(zkPaths.evtsPath)) { - if (!state.crd) - state.zkClient.getDataAsync(evt.getPath(), this, dataCallback); + if (!rtState.crd) + rtState.zkClient.getDataAsync(evt.getPath(), this, dataCallback); } else U.warn(log, "Received NodeDataChanged for unexpected path: " + evt.getPath()); } else if (evt.getType() == Event.EventType.NodeChildrenChanged) { if (evt.getPath().equals(zkPaths.aliveNodesDir)) - state.zkClient.getChildrenAsync(evt.getPath(), this, childrenCallback); + rtState.zkClient.getChildrenAsync(evt.getPath(), this, childrenCallback); else if (evt.getPath().equals(zkPaths.customEvtsDir)) - state.zkClient.getChildrenAsync(evt.getPath(), this, childrenCallback); + rtState.zkClient.getChildrenAsync(evt.getPath(), this, childrenCallback); else U.warn(log, "Received NodeChildrenChanged for unexpected path: " + evt.getPath()); } @@ -1968,7 +2138,7 @@ public class ZookeeperDiscoveryImpl { assert rc == 0 : KeeperException.Code.get(rc); if (path.equals(zkPaths.evtsPath)) { - if (!state.crd) + if (!rtState.crd) processNewEvents(data); } else @@ -1993,7 +2163,7 @@ public class ZookeeperDiscoveryImpl { try { if (evt.getType() == Event.EventType.NodeDataChanged) - state.zkClient.getDataAsync(evt.getPath(), this, this); + rtState.zkClient.getDataAsync(evt.getPath(), this, this); busyLock.leaveBusy(); } @@ -2008,7 +2178,7 @@ public class ZookeeperDiscoveryImpl { return; try { - assert state.crd; + assert rtState.crd; processResult0(rc, path, data); @@ -2036,11 +2206,11 @@ public class ZookeeperDiscoveryImpl { assert rc == 0 : KeeperException.Code.get(rc); if (data.length > 0) { - ZkAliveNodeData nodeData = unmarshal(data); + ZkAliveNodeData nodeData = unmarshalZip(data); Integer nodeInternalId = ZkIgnitePaths.aliveInternalId(path); - Iterator<ZkDiscoveryEventData> it = state.evtsData.evts.values().iterator(); + Iterator<ZkDiscoveryEventData> it = rtState.evtsData.evts.values().iterator(); boolean processed = false; @@ -2075,7 +2245,7 @@ public class ZookeeperDiscoveryImpl { log.info("Previous node watch event: " + evt); if (evt.getType() != Event.EventType.None) - state.zkClient.existsAsync(evt.getPath(), this, this); + rtState.zkClient.existsAsync(evt.getPath(), this, this); } busyLock.leaveBusy(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e909027f/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index 2c6890f..6c32a4e 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -38,6 +38,7 @@ import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingCluster; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheWriteSynchronizationMode; @@ -760,7 +761,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } private static String aliveZkNodePath(DiscoverySpi spi) { - String path = GridTestUtils.getFieldValue(spi, "impl", "state", "locNodeZkPath"); + String path = GridTestUtils.getFieldValue(spi, "impl", "rtState", "locNodeZkPath"); return path.substring(path.lastIndexOf('/') + 1); } @@ -1406,11 +1407,37 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testDuplicatedNodeId() throws Exception { - nodeId = UUID.randomUUID(); + UUID nodeId0 = nodeId = UUID.randomUUID(); startGrid(0); - startGrid(1); + int failingNodeIdx = 100; + + for (int i = 0; i < 5; i++) { + final int idx = failingNodeIdx++; + + nodeId = nodeId0; + + info("Start node with duplicated ID [iter=" + i + ", nodeId=" + nodeId + ']'); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + startGrid(idx); + + return null; + } + }, IgniteCheckedException.class, null); + + nodeId = null; + + info("Start node with unique ID [iter=" + i + ']'); + + Ignite ignite = startGrid(idx); + + nodeId0 = ignite.cluster().localNode().id(); + + waitForTopology(i + 2); + } } /** @@ -1650,7 +1677,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { Map<Object, Object> evts = GridTestUtils.getFieldValue(node.configuration().getDiscoverySpi(), - "impl", "state", "evtsData", "evts"); + "impl", "rtState", "evtsData", "evts"); if (!evts.isEmpty()) { info("Unacked events: " + evts); @@ -1809,7 +1836,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { * @param spi Spi instance. */ private static ZooKeeper zkClient(ZookeeperDiscoverySpi spi) { - return GridTestUtils.getFieldValue(spi, "impl", "state", "zkClient", "zk"); + return GridTestUtils.getFieldValue(spi, "impl", "rtState", "zkClient", "zk"); } /**
