Repository: ignite Updated Branches: refs/heads/ignite-zk 1f82a5311 -> 18527db9b
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/18527db9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/18527db9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/18527db9 Branch: refs/heads/ignite-zk Commit: 18527db9ba5d13b0964ec9c87c8b155295921c9a Parents: 1f82a53 Author: sboikov <sboi...@gridgain.com> Authored: Tue Nov 21 16:54:38 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Nov 21 18:34:44 2017 +0300 ---------------------------------------------------------------------- .../discovery/zk/internal/ZkAliveNodeData.java | 37 ++ .../zk/internal/ZkDiscoveryEventData.java | 38 ++ .../zk/internal/ZkDiscoveryEventsData.java | 5 +- .../discovery/zk/internal/ZkEventAckFuture.java | 7 +- .../discovery/zk/internal/ZkIgnitePaths.java | 128 +++++ .../spi/discovery/zk/internal/ZkPaths.java | 116 ---- .../discovery/zk/internal/ZookeeperClient.java | 43 +- .../zk/internal/ZookeeperDiscoveryImpl.java | 534 +++++++++++++------ .../zk/ZookeeperDiscoverySpiBasicTest.java | 49 ++ .../zk/internal/ZookeeperClientTest.java | 23 + 10 files changed, 694 insertions(+), 286 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/18527db9/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java new file mode 100644 index 0000000..45f453f --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * + */ +public class ZkAliveNodeData implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + long lastProcEvt = -1; + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ZkAliveNodeData.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/18527db9/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java index 3982c90..9f18f4f 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java @@ -18,6 +18,9 @@ package org.apache.ignite.spi.discovery.zk.internal; import java.io.Serializable; +import java.util.Collection; +import java.util.Set; +import org.apache.ignite.internal.util.typedef.internal.U; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; @@ -39,6 +42,9 @@ abstract class ZkDiscoveryEventData implements Serializable { /** */ private final long topVer; + /** */ + private transient Set<Integer> remainingAcks; + /** * @param evtType Event type. * @param topVer Topology version. @@ -51,6 +57,38 @@ abstract class ZkDiscoveryEventData implements Serializable { this.topVer = topVer; } + void remainingAcks(Collection<ZookeeperClusterNode> nodes) { + assert remainingAcks == null : this; + + remainingAcks = U.newHashSet(nodes.size()); + + for (ZookeeperClusterNode node : nodes) { + if (!node.isLocal() && node.order() <= topVer) + remainingAcks.add(node.internalId()); + } + } + + boolean allAcksReceived() { + return remainingAcks.isEmpty(); + } + + boolean onAckReceived(Integer nodeInternalId, long ackEvtId) { + assert remainingAcks != null; + + if (ackEvtId >= evtId) + remainingAcks.remove(nodeInternalId); + + return remainingAcks.isEmpty(); + } + + boolean onNodeFail(ZookeeperClusterNode node) { + assert remainingAcks != null : this; + + remainingAcks.remove(node.internalId()); + + return remainingAcks.isEmpty(); + } + long eventId() { return evtId; } http://git-wip-us.apache.org/repos/asf/ignite/blob/18527db9/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java index d3f07ae..ce21a06 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi.discovery.zk.internal; import java.io.Serializable; +import java.util.Collection; import java.util.TreeMap; /** @@ -56,9 +57,11 @@ class ZkDiscoveryEventsData implements Serializable { /** * @param evt Event. */ - void addEvent(ZkDiscoveryEventData evt) { + void addEvent(Collection<ZookeeperClusterNode> nodes, ZkDiscoveryEventData evt) { Object old = evts.put(evt.eventId(), evt); assert old == null : old; + + evt.remainingAcks(nodes); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/18527db9/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java index fa0da99..c89b586 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java @@ -51,6 +51,11 @@ public class ZkEventAckFuture extends GridFutureAdapter<Void> implements Watcher /** */ private final Set<Integer> remaininAcks; + /** + * @param impl + * @param evtPath + * @param evtId + */ ZkEventAckFuture(ZookeeperDiscoveryImpl impl, String evtPath, Long evtId) { this.impl = impl; this.log = impl.log(); @@ -94,8 +99,6 @@ public class ZkEventAckFuture extends GridFutureAdapter<Void> implements Watcher /** {@inheritDoc} */ @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) { if (super.onDone(res, err)) { - impl.removeAckFuture(this); - return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/18527db9/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java new file mode 100644 index 0000000..ad35c05 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.UUID; + +/** + * + */ +class ZkIgnitePaths { + /** */ + private static final int UUID_LEN = 36; + + /** */ + private static final String JOIN_DATA_DIR = "joinData"; + + /** */ + private static final String CUSTOM_EVTS_DIR = "customEvts"; + + /** */ + private static final String ALIVE_NODES_DIR = "alive"; + + /** */ + private static final String DISCO_EVENTS_PATH = "events"; + + /** */ + final String basePath; + + /** */ + private final String clusterName; + + /** */ + final String clusterDir; + + /** */ + final String aliveNodesDir; + + /** */ + final String joinDataDir; + + /** */ + final String evtsPath; + + /** */ + final String customEvtsDir; + + /** + * @param basePath Base directory. + * @param clusterName Cluster name. + */ + ZkIgnitePaths(String basePath, String clusterName) { + this.basePath = basePath; + this.clusterName = clusterName; + + clusterDir = basePath + "/" + clusterName; + + aliveNodesDir = zkPath(ALIVE_NODES_DIR); + joinDataDir = zkPath(JOIN_DATA_DIR); + evtsPath = zkPath(DISCO_EVENTS_PATH); + customEvtsDir = zkPath(CUSTOM_EVTS_DIR); + } + + /** + * @param path Relative path. + * @return Full path. + */ + String zkPath(String path) { + return basePath + "/" + clusterName + "/" + path; + } + + static int aliveInternalId(String path) { + int idx = path.lastIndexOf('|'); + + return Integer.parseInt(path.substring(idx + 1)); + } + + static UUID aliveNodeId(String path) { + String idStr = path.substring(0, ZkIgnitePaths.UUID_LEN); + + return UUID.fromString(idStr); + } + + static int aliveJoinSequence(String path) { + int idx1 = path.indexOf('|'); + int idx2 = path.lastIndexOf('|'); + + return Integer.parseInt(path.substring(idx1 + 1, idx2)); + } + + static int customEventSequence(String path) { + int idx = path.lastIndexOf('|'); + + return Integer.parseInt(path.substring(idx + 1)); + } + + static UUID customEventSendNodeId(String path) { + String idStr = path.substring(0, ZkIgnitePaths.UUID_LEN); + + return UUID.fromString(idStr); + } + + String joinEventDataPath(long evtId) { + return evtsPath + "/" + evtId; + } + + String joinEventDataPathForJoined(long evtId) { + return evtsPath + "/joined-" + evtId; + } + + String customEventDataPath(String child) { + return customEvtsDir + "/" + child; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/18527db9/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java deleted file mode 100644 index 643e10d..0000000 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.zk.internal; - -import java.util.UUID; - -/** - * - */ -class ZkPaths { - /** */ - private static final int UUID_LEN = 36; - - /** */ - private static final String JOIN_DATA_DIR = "joinData"; - - /** */ - private static final String CUSTOM_EVTS_DIR = "customEvts"; - - /** */ - private static final String ALIVE_NODES_DIR = "alive"; - - /** */ - private static final String DISCO_EVENTS_PATH = "events"; - - /** */ - final String basePath; - - /** */ - private final String clusterName; - - /** */ - final String clusterDir; - - /** */ - final String aliveNodesDir; - - /** */ - final String joinDataDir; - - /** */ - final String evtsPath; - - /** */ - final String customEvtsDir; - - /** - * @param basePath Base directory. - * @param clusterName Cluster name. - */ - ZkPaths(String basePath, String clusterName) { - this.basePath = basePath; - this.clusterName = clusterName; - - clusterDir = basePath + "/" + clusterName; - - aliveNodesDir = zkPath(ALIVE_NODES_DIR); - joinDataDir = zkPath(JOIN_DATA_DIR); - evtsPath = zkPath(DISCO_EVENTS_PATH); - customEvtsDir = zkPath(CUSTOM_EVTS_DIR); - } - - /** - * @param path Relative path. - * @return Full path. - */ - String zkPath(String path) { - return basePath + "/" + clusterName + "/" + path; - } - - static int aliveInternalId(String path) { - int idx = path.lastIndexOf('|'); - - return Integer.parseInt(path.substring(idx + 1)); - } - - static UUID aliveNodeId(String path) { - String idStr = path.substring(0, ZkPaths.UUID_LEN); - - return UUID.fromString(idStr); - } - - static int aliveJoinSequence(String path) { - int idx1 = path.indexOf('|'); - int idx2 = path.lastIndexOf('|'); - - return Integer.parseInt(path.substring(idx1 + 1, idx2)); - } - - static int customEventSequence(String path) { - int idx = path.lastIndexOf('|'); - - return Integer.parseInt(path.substring(idx + 1)); - } - - static UUID customEventSendNodeId(String path) { - String idStr = path.substring(0, ZkPaths.UUID_LEN); - - return UUID.fromString(idStr); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/18527db9/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java index 4fdc9fc..626b235 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java @@ -29,12 +29,14 @@ import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Op; +import org.apache.zookeeper.OpResult; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; +import org.jetbrains.annotations.Nullable; /** * @@ -217,8 +219,33 @@ public class ZookeeperClient implements Watcher { } } + void createAllIfNeeded(List<String> paths, CreateMode createMode) + throws ZookeeperClientFailedException, InterruptedException + { + // TODO ZK: need check for max size? + List<Op> ops = new ArrayList<>(paths.size()); + + for (String path : paths) + ops.add(Op.create(path, EMPTY_BYTES, ZK_ACL, createMode)); + + for (;;) { + long connStartTime = this.connStartTime; + + try { + zk.multi(ops); + + return; + } + catch (Exception e) { + onZookeeperError(connStartTime, e); + } + } + + } + String createIfNeeded(String path, byte[] data, CreateMode createMode) - throws ZookeeperClientFailedException, InterruptedException { + throws ZookeeperClientFailedException, InterruptedException + { if (data == null) data = EMPTY_BYTES; @@ -266,14 +293,20 @@ public class ZookeeperClient implements Watcher { } - void deleteAllIfExists(String parent, List<String> paths, int ver) - throws ZookeeperClientFailedException, InterruptedException + void deleteAll(@Nullable String parent, List<String> paths, int ver) + throws KeeperException.NoNodeException, ZookeeperClientFailedException, InterruptedException { + if (paths.isEmpty()) + return; + // TODO ZK: need check for max size? List<Op> ops = new ArrayList<>(paths.size()); - for (String path : paths) - ops.add(Op.delete(parent + "/" + path, ver)); + for (String path : paths) { + String path0 = parent != null ? parent + "/" + path : path; + + ops.add(Op.delete(path0, ver)); + } for (;;) { long connStartTime = this.connStartTime; http://git-wip-us.apache.org/repos/asf/ignite/blob/18527db9/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 6f8c07f..8246e19 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -22,17 +22,18 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import org.apache.curator.utils.PathUtils; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; @@ -63,10 +64,13 @@ import static org.apache.zookeeper.CreateMode.PERSISTENT; */ public class ZookeeperDiscoveryImpl { /** */ + public static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD = "IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD"; + + /** */ private final JdkMarshaller marsh = new JdkMarshaller(); /** */ - private final ZkPaths zkPaths; + private final ZkIgnitePaths zkPaths; /** */ private final IgniteLogger log; @@ -102,19 +106,22 @@ public class ZookeeperDiscoveryImpl { private long gridStartTime; /** */ - private long lastProcEvt = -1; - - /** */ private boolean joined; /** */ - private ZkDiscoveryEventsData evts; + private ZkDiscoveryEventsData evtsData; /** */ private boolean crd; /** */ - private Map<Long, ZkEventAckFuture> ackFuts = new ConcurrentHashMap<>(); + private String locNodeZkPath; + + /** */ + private ZkAliveNodeData locNodeInfo = new ZkAliveNodeData(); + + /** */ + private final int evtsAckThreshold; /** * @param log @@ -137,7 +144,7 @@ public class ZookeeperDiscoveryImpl { PathUtils.validatePath(basePath); - zkPaths = new ZkPaths(basePath, clusterName); + zkPaths = new ZkIgnitePaths(basePath, clusterName); this.log = log.getLogger(getClass()); this.locNode = locNode; @@ -147,6 +154,13 @@ public class ZookeeperDiscoveryImpl { watcher = new ZkWatcher(); childrenCallback = new ZKChildrenCallback(); dataCallback = new ZkDataCallback(); + + int evtsAckThreshold = IgniteSystemProperties.getInteger(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, 5); + + if (evtsAckThreshold <= 0) + evtsAckThreshold = 1; + + this.evtsAckThreshold = evtsAckThreshold; } IgniteLogger log() { @@ -184,7 +198,7 @@ public class ZookeeperDiscoveryImpl { List<String> children = zkClient.getChildren(zkPaths.aliveNodesDir); for (int i = 0; i < children.size(); i++) { - UUID id = ZkPaths.aliveNodeId(children.get(i)); + UUID id = ZkIgnitePaths.aliveNodeId(children.get(i)); if (nodeId.equals(id)) return true; @@ -300,17 +314,27 @@ public class ZookeeperDiscoveryImpl { if (zkClient.exists(zkPaths.aliveNodesDir)) return; // This path is created last, assume all others dirs are created. - zkClient.createIfNeeded(zkPaths.basePath, null, PERSISTENT); - - zkClient.createIfNeeded(zkPaths.clusterDir, null, PERSISTENT); - - zkClient.createIfNeeded(zkPaths.evtsPath, null, PERSISTENT); - - zkClient.createIfNeeded(zkPaths.joinDataDir, null, PERSISTENT); - - zkClient.createIfNeeded(zkPaths.customEvtsDir, null, PERSISTENT); - - zkClient.createIfNeeded(zkPaths.aliveNodesDir, null, PERSISTENT); + List<String> dirs = new ArrayList<>(); + + dirs.add(zkPaths.basePath); + dirs.add(zkPaths.clusterDir); + dirs.add(zkPaths.evtsPath); + dirs.add(zkPaths.joinDataDir); + dirs.add(zkPaths.customEvtsDir); + dirs.add(zkPaths.aliveNodesDir); + + zkClient.createAllIfNeeded(dirs, PERSISTENT); +// zkClient.createIfNeeded(zkPaths.basePath, null, PERSISTENT); +// +// zkClient.createIfNeeded(zkPaths.clusterDir, null, PERSISTENT); +// +// zkClient.createIfNeeded(zkPaths.evtsPath, null, PERSISTENT); +// +// zkClient.createIfNeeded(zkPaths.joinDataDir, null, PERSISTENT); +// +// zkClient.createIfNeeded(zkPaths.customEvtsDir, null, PERSISTENT); +// +// zkClient.createIfNeeded(zkPaths.aliveNodesDir, null, PERSISTENT); } catch (ZookeeperClientFailedException e) { throw new IgniteSpiException("Failed to initialize Zookeeper nodes", e); @@ -322,14 +346,16 @@ public class ZookeeperDiscoveryImpl { */ private void startJoin(byte[] joinDataBytes) throws InterruptedException { try { - zkClient.getDataAsync(zkPaths.evtsPath, watcher, dataCallback); - // TODO ZK: handle max size. - String path = zkClient.createIfNeeded(zkPaths.joinDataDir + "/" + locNode.id() + "|", joinDataBytes, EPHEMERAL_SEQUENTIAL); + String path = zkClient.createIfNeeded(zkPaths.joinDataDir + "/" + locNode.id() + "|", + joinDataBytes, + EPHEMERAL_SEQUENTIAL); int seqNum = Integer.parseInt(path.substring(path.lastIndexOf('|') + 1)); - zkClient.createIfNeeded(zkPaths.aliveNodesDir + "/" + locNode.id() + "|" + seqNum + "|", null, EPHEMERAL_SEQUENTIAL); + locNodeZkPath = zkClient.createIfNeeded(zkPaths.aliveNodesDir + "/" + locNode.id() + "|" + seqNum + "|", + null, + EPHEMERAL_SEQUENTIAL); zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new AsyncCallback.Children2Callback() { @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { @@ -337,6 +363,8 @@ public class ZookeeperDiscoveryImpl { } }); + zkClient.getDataAsync(zkPaths.evtsPath, watcher, dataCallback); + connStartLatch.countDown(); } catch (ZookeeperClientFailedException e) { @@ -366,7 +394,7 @@ public class ZookeeperDiscoveryImpl { checkIsCoordinator(rc, aliveNodes); } - private void checkIsCoordinator(int rc, List<String> aliveNodes) { + private void checkIsCoordinator(int rc, final List<String> aliveNodes) { try { assert rc == 0 : rc; @@ -375,12 +403,12 @@ public class ZookeeperDiscoveryImpl { Integer locInternalId = null; for (String aliveNodePath : aliveNodes) { - Integer internalId = ZkPaths.aliveInternalId(aliveNodePath); + Integer internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath); alives.put(internalId, aliveNodePath); if (locInternalId == null) { - UUID nodeId = ZkPaths.aliveNodeId(aliveNodePath); + UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath); if (locNode.id().equals(nodeId)) locInternalId = internalId; @@ -393,7 +421,7 @@ public class ZookeeperDiscoveryImpl { Map.Entry<Integer, String> crdE = alives.firstEntry(); if (locInternalId.equals(crdE.getKey())) - onBecomeCoordinator(locInternalId); + onBecomeCoordinator(aliveNodes, locInternalId); else { assert alives.size() > 1; @@ -412,7 +440,7 @@ public class ZookeeperDiscoveryImpl { @Override public void process(WatchedEvent evt) { if (evt.getType() == Event.EventType.NodeDeleted) { try { - onPreviousNodeFail(crdInternalId, locInternalId0); + onPreviousNodeFail(aliveNodes, crdInternalId, locInternalId0); } catch (Throwable e) { onFatalError(e); @@ -425,7 +453,7 @@ public class ZookeeperDiscoveryImpl { if (stat == null) { try { - onPreviousNodeFail(crdInternalId, locInternalId0); + onPreviousNodeFail(aliveNodes, crdInternalId, locInternalId0); } catch (Throwable e) { onFatalError(e); @@ -440,30 +468,29 @@ public class ZookeeperDiscoveryImpl { } } - private void onPreviousNodeFail(int crdInternalId, int locInternalId) throws Exception { - if (locInternalId == crdInternalId + 1) { - if (log.isInfoEnabled()) - log.info("Previous discovery coordinator failed [locId=" + locNode.id() + ']'); - - onBecomeCoordinator(locInternalId); - } - else { - if (log.isInfoEnabled()) - log.info("Previous node failed, check is node new coordinator [locId=" + locNode.id() + ']'); + private void onPreviousNodeFail(List<String> aliveNodes, int crdInternalId, int locInternalId) throws Exception { + // TODO ZK: +// if (locInternalId == crdInternalId + 1) { +// if (log.isInfoEnabled()) +// log.info("Previous discovery coordinator failed [locId=" + locNode.id() + ']'); +// +// onBecomeCoordinator(aliveNodes, locInternalId); +// } + if (log.isInfoEnabled()) + log.info("Previous node failed, check is node new coordinator [locId=" + locNode.id() + ']'); - zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new AsyncCallback.Children2Callback() { - @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - checkIsCoordinator(rc, children); - } - }); - } + zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new AsyncCallback.Children2Callback() { + @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { + checkIsCoordinator(rc, children); + } + }); } /** * @param locInternalId Local node's internal ID. * @throws Exception If failed. */ - private void onBecomeCoordinator(int locInternalId) throws Exception { + private void onBecomeCoordinator(List<String> aliveNodes, int locInternalId) throws Exception { byte[] evtsData = zkClient.getData(zkPaths.evtsPath); if (evtsData.length > 0) @@ -476,7 +503,21 @@ public class ZookeeperDiscoveryImpl { log.info("Node is new discovery coordinator [locId=" + locNode.id() + ']'); assert locNode.order() > 0 : locNode; - assert evts != null; + assert this.evtsData != null; + + Iterator<ZkDiscoveryEventData> it = this.evtsData.evts.values().iterator(); + + while (it.hasNext()) { + ZkDiscoveryEventData evtData = it.next(); + + evtData.remainingAcks(top.nodesByOrder.values()); + + if (evtData.allAcksReceived()) { + processNodesAckEvent(evtData); + + it.remove(); + } + } } else { if (log.isInfoEnabled()) @@ -487,6 +528,79 @@ public class ZookeeperDiscoveryImpl { zkClient.getChildrenAsync(zkPaths.aliveNodesDir, watcher, childrenCallback); zkClient.getChildrenAsync(zkPaths.customEvtsDir, watcher, childrenCallback); + + for (String alivePath : aliveNodes) + watchAliveNodeData(alivePath); + } + + /** + * @param alivePath + */ + private void watchAliveNodeData(String alivePath) { + assert locNodeZkPath != null; + + String path = zkPaths.aliveNodesDir + "/" + alivePath; + + if (!path.equals(locNodeZkPath)) + zkClient.getDataAsync(path, aliveNodeDataWatcher, aliveNodeDataUpdateCallback); + } + + /** */ + private final AliveNodeDataWatcher aliveNodeDataWatcher = new AliveNodeDataWatcher(); + + /** */ + private AliveNodeDataUpdateCallback aliveNodeDataUpdateCallback = new AliveNodeDataUpdateCallback(); + + /** + * + */ + private class AliveNodeDataWatcher implements Watcher { + @Override public void process(WatchedEvent evt) { + if (evt.getType() == Event.EventType.NodeDataChanged) { + zkClient.getDataAsync(evt.getPath(), this, aliveNodeDataUpdateCallback); + } + } + } + + /** + * + */ + private class AliveNodeDataUpdateCallback implements AsyncCallback.DataCallback { + @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + assert crd; + + if (rc == KeeperException.Code.NONODE.intValue()) { + if (log.isDebugEnabled()) + log.debug("Alive node callaback, no node: " + path); + + return; + } + + assert rc == 0 : rc; + + try { + if (data.length > 0) { + ZkAliveNodeData nodeData = unmarshal(data); + + Integer nodeInternalId = ZkIgnitePaths.aliveInternalId(path); + + Iterator<ZkDiscoveryEventData> it = evtsData.evts.values().iterator(); + + while (it.hasNext()) { + ZkDiscoveryEventData evtData = it.next(); + + if (evtData.onAckReceived(nodeInternalId, nodeData.lastProcEvt)) { + processNodesAckEvent(evtData); + + it.remove(); + } + } + } + } + catch (Throwable e) { + onFatalError(e); + } + } } /** @@ -503,35 +617,47 @@ public class ZookeeperDiscoveryImpl { TreeMap<Long, ZookeeperClusterNode> curTop = new TreeMap<>(top.nodesByOrder); - int evtCnt = evts.evts.size(); + boolean newEvts = false; for (String child : aliveNodes) { - Integer inernalId = ZkPaths.aliveInternalId(child); + Integer inernalId = ZkIgnitePaths.aliveInternalId(child); Object old = alives.put(inernalId, child); assert old == null; - if (!top.nodesByInternalId.containsKey(inernalId)) + if (!top.nodesByInternalId.containsKey(inernalId)) { generateNodeJoin(curTop, inernalId, child); + + watchAliveNodeData(child); + + newEvts = true; + } } for (Map.Entry<Integer, ZookeeperClusterNode> e : top.nodesByInternalId.entrySet()) { - if (!alives.containsKey(e.getKey())) - generateNodeFail(curTop, e.getValue()); + if (!alives.containsKey(e.getKey())) { + ZookeeperClusterNode failedNode = e.getValue(); + + processEventAcksOnNodeFail(failedNode); + + generateNodeFail(curTop, failedNode); + + newEvts = true; + } } - if (evts.evts.size() > evtCnt) { + if (newEvts) { long start = System.currentTimeMillis(); - zkClient.setData(zkPaths.evtsPath, marsh.marshal(evts), -1); + zkClient.setData(zkPaths.evtsPath, marsh.marshal(evtsData), -1); long time = System.currentTimeMillis() - start; if (log.isInfoEnabled()) - log.info("Discovery coordinator saved new topology events [topVer=" + evts.topVer + ", saveTime=" + time + ']'); + log.info("Discovery coordinator saved new topology events [topVer=" + evtsData.topVer + ", saveTime=" + time + ']'); - onEventsUpdate(evts); + onEventsUpdate(evtsData); } } @@ -544,14 +670,15 @@ public class ZookeeperDiscoveryImpl { assert rmvd != null; - evts.topVer++; - evts.evtIdGen++; + evtsData.topVer++; + evtsData.evtIdGen++; - ZkDiscoveryEventData evtData = new ZkDiscoveryNodeFailEventData(evts.evtIdGen, - evts.topVer, + ZkDiscoveryNodeFailEventData evtData = new ZkDiscoveryNodeFailEventData( + evtsData.evtIdGen, + evtsData.topVer, failedNode.internalId()); - evts.addEvent(evtData); + evtsData.addEvent(curTop.values(), evtData); if (log.isInfoEnabled()) { log.info("Generated NODE_FAILED event [topVer=" + evtData.topologyVersion() + @@ -564,8 +691,8 @@ public class ZookeeperDiscoveryImpl { String aliveNodePath) throws Exception { - UUID nodeId = ZkPaths.aliveNodeId(aliveNodePath); - int joinSeq = ZkPaths.aliveJoinSequence(aliveNodePath); + UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath); + int joinSeq = ZkIgnitePaths.aliveJoinSequence(aliveNodePath); String joinDataPath = zkPaths.joinDataDir + '/' + nodeId.toString() + "|" + String.format("%010d", joinSeq); @@ -587,10 +714,10 @@ public class ZookeeperDiscoveryImpl { assert nodeId.equals(joinedNode.id()) : joiningNodeData.node(); - evts.topVer++; - evts.evtIdGen++; + evtsData.topVer++; + evtsData.evtIdGen++; - joinedNode.order(evts.topVer); + joinedNode.order(evtsData.topVer); joinedNode.internalId(internalId); DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(nodeId); @@ -614,21 +741,19 @@ public class ZookeeperDiscoveryImpl { assert old == null; ZkDiscoveryNodeJoinEventData evtData = new ZkDiscoveryNodeJoinEventData( - evts.evtIdGen, - evts.topVer, + evtsData.evtIdGen, + evtsData.topVer, joinedNode.id(), joinedNode.internalId()); evtData.joiningNodeData = joiningNodeData; - evts.addEvent(evtData); - - String evtDataPath = zkPaths.evtsPath + "/" + evtData.eventId(); + evtsData.addEvent(dataForJoined.topology(), evtData); long start = System.currentTimeMillis(); - zkClient.createIfNeeded(evtDataPath, joinData, PERSISTENT); - zkClient.createIfNeeded(evtDataPath + "/joined", marshal(dataForJoined), PERSISTENT); + zkClient.createIfNeeded(zkPaths.joinEventDataPath(evtData.eventId()), joinData, PERSISTENT); + zkClient.createIfNeeded(zkPaths.joinEventDataPathForJoined(evtData.eventId()), marshal(dataForJoined), PERSISTENT); long time = System.currentTimeMillis() - start; @@ -651,7 +776,7 @@ public class ZookeeperDiscoveryImpl { gridStartTime = U.currentTimeMillis(); - evts = new ZkDiscoveryEventsData(gridStartTime, 1L, new TreeMap<Long, ZkDiscoveryEventData>()); + evtsData = new ZkDiscoveryEventsData(gridStartTime, 1L, new TreeMap<Long, ZkDiscoveryEventData>()); locNode.internalId(locInternalId); locNode.order(1); @@ -666,10 +791,15 @@ public class ZookeeperDiscoveryImpl { null); joinFut.onDone(); + + // TODO ZK: remove join zk nodes } + /** + * @throws Exception If failed. + */ private void cleanupPreviousClusterData() throws Exception { - // TODO ZK: use multi. + // TODO ZK: use multi, better batching. zkClient.setData(zkPaths.evtsPath, null, -1); List<String> evtChildren = zkClient.getChildren(zkPaths.evtsPath); @@ -680,10 +810,11 @@ public class ZookeeperDiscoveryImpl { removeChildren(evtDir); } - zkClient.deleteAllIfExists(zkPaths.evtsPath, evtChildren, -1); + zkClient.deleteAll(zkPaths.evtsPath, evtChildren, -1); - for (String evtPath : zkClient.getChildren(zkPaths.customEvtsDir)) - zkClient.delete(zkPaths.customEvtsDir + "/" + evtPath, -1); + zkClient.deleteAll(zkPaths.customEvtsDir, + zkClient.getChildren(zkPaths.customEvtsDir), + -1); } /** @@ -691,11 +822,7 @@ public class ZookeeperDiscoveryImpl { * @throws Exception If failed. */ private void removeChildren(String path) throws Exception { - zkClient.deleteAllIfExists(path, zkClient.getChildren(path), -1); - } - - void removeAckFuture(ZkEventAckFuture fut) { - ackFuts.remove(fut.eventId()); + zkClient.deleteAll(path, zkClient.getChildren(path), -1); } ZkClusterNodes nodes() { @@ -718,9 +845,9 @@ public class ZookeeperDiscoveryImpl { for (int i = 0; i < customEvtNodes.size(); i++) { String evtPath = customEvtNodes.get(i); - int evtSeq = ZkPaths.customEventSequence(evtPath); + int evtSeq = ZkIgnitePaths.customEventSequence(evtPath); - if (evtSeq > evts.procCustEvt) { + if (evtSeq > evtsData.procCustEvt) { if (newEvts == null) newEvts = new TreeMap<>(); @@ -730,7 +857,7 @@ public class ZookeeperDiscoveryImpl { if (newEvts != null) { for (Map.Entry<Integer, String> evtE : newEvts.entrySet()) { - UUID sndNodeId = ZkPaths.customEventSendNodeId(evtE.getValue()); + UUID sndNodeId = ZkIgnitePaths.customEventSendNodeId(evtE.getValue()); ZookeeperClusterNode sndNode = top.nodesById.get(sndNodeId); @@ -744,17 +871,17 @@ public class ZookeeperDiscoveryImpl { try { msg = unmarshal(evtBytes); - evts.evtIdGen++; + evtsData.evtIdGen++; ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData( - evts.evtIdGen, - evts.topVer, + evtsData.evtIdGen, + evtsData.topVer, sndNodeId, evtE.getValue()); evtData.msg = msg; - evts.addEvent(evtData); + evtsData.addEvent(top.nodesByOrder.values(), evtData); if (log.isInfoEnabled()) log.info("Generated CUSTOM event [topVer=" + evtData.topologyVersion() + ", evt=" + msg + ']'); @@ -769,19 +896,19 @@ public class ZookeeperDiscoveryImpl { zkClient.deleteIfExists(evtDataPath, -1); } - evts.procCustEvt = evtE.getKey(); + evtsData.procCustEvt = evtE.getKey(); } long start = System.currentTimeMillis(); - zkClient.setData(zkPaths.evtsPath, marsh.marshal(evts), -1); + zkClient.setData(zkPaths.evtsPath, marsh.marshal(evtsData), -1); long time = System.currentTimeMillis() - start; if (log.isInfoEnabled()) - log.info("Discovery coordinator saved new topology events [topVer=" + evts.topVer + ", saveTime=" + time + ']'); + log.info("Discovery coordinator saved new topology events [topVer=" + evtsData.topVer + ", saveTime=" + time + ']'); - onEventsUpdate(evts); + onEventsUpdate(evtsData); } } @@ -799,18 +926,26 @@ public class ZookeeperDiscoveryImpl { onEventsUpdate(evtsData); - evts = evtsData; + this.evtsData = evtsData; } + /** */ + private int procEvtCnt; + /** * @param evtsData Events. * @throws Exception If failed. */ + @SuppressWarnings("unchecked") private void onEventsUpdate(ZkDiscoveryEventsData evtsData) throws Exception { TreeMap<Long, ZkDiscoveryEventData> evts = evtsData.evts; - for (Map.Entry<Long, ZkDiscoveryEventData> e : evts.tailMap(lastProcEvt, false).entrySet()) { - ZkDiscoveryEventData evtData = e.getValue(); + boolean updateNodeInfo = false; + + Iterator<ZkDiscoveryEventData> it = evts.tailMap(locNodeInfo.lastProcEvt, false).values().iterator(); + + while (it.hasNext()) { + ZkDiscoveryEventData evtData = it.next(); if (!joined) { if (evtData.eventType() != EventType.EVT_NODE_JOINED) @@ -818,50 +953,13 @@ public class ZookeeperDiscoveryImpl { ZkDiscoveryNodeJoinEventData evtData0 = (ZkDiscoveryNodeJoinEventData)evtData; - UUID joinedId = ((ZkDiscoveryNodeJoinEventData)evtData).nodeId; + UUID joinedId = evtData0.nodeId; boolean locJoin = evtData.eventType() == EventType.EVT_NODE_JOINED && locNode.id().equals(joinedId); - if (locJoin) { - if (log.isInfoEnabled()) - log.info("Local join event data: " + evtData + ']'); - - String path = zkPaths.evtsPath + "/" + evtData.eventId() + "/joined"; - - ZkJoinEventDataForJoined dataForJoined = unmarshal(zkClient.getData(path)); - - gridStartTime = evtsData.gridStartTime; - - locNode.internalId(evtData0.joinedInternalId); - locNode.order(evtData.topologyVersion()); - - DiscoveryDataBag dataBag = new DiscoveryDataBag(locNode.id()); - - dataBag.commonData(dataForJoined.discoveryData()); - - exchange.onExchange(dataBag); - - List<ZookeeperClusterNode> allNodes = dataForJoined.topology(); - - for (ZookeeperClusterNode node : allNodes) - top.addNode(node); - - top.addNode(locNode); - - List<ClusterNode> topSnapshot = new ArrayList<>((Collection)top.nodesByOrder.values()); - - lsnr.onDiscovery(evtData.eventType(), - evtData.topologyVersion(), - locNode, - topSnapshot, - Collections.<Long, Collection<ClusterNode>>emptyMap(), - null); - - joinFut.onDone(); - - joined = true; - } + if (locJoin) + processLocalJoin(evtsData, evtData0); } else { if (log.isInfoEnabled()) @@ -879,7 +977,7 @@ public class ZookeeperDiscoveryImpl { joiningData = evtData0.joiningNodeData; } else { - String path = zkPaths.evtsPath + "/" + evtData.eventId(); + String path = zkPaths.joinEventDataPath(evtData.eventId()); joiningData = unmarshal(zkClient.getData(path)); @@ -896,7 +994,7 @@ public class ZookeeperDiscoveryImpl { } case EventType.EVT_NODE_FAILED: { - notifyNodeFail((ZkDiscoveryNodeFailEventData)e.getValue()); + notifyNodeFail((ZkDiscoveryNodeFailEventData)evtData); break; } @@ -912,7 +1010,7 @@ public class ZookeeperDiscoveryImpl { msg = evtData0.msg; } else { - String path = zkPaths.customEvtsDir + "/" + evtData0.evtPath; + String path = zkPaths.customEventDataPath(evtData0.evtPath); msg = unmarshal(zkClient.getData(path)); } @@ -925,13 +1023,78 @@ public class ZookeeperDiscoveryImpl { default: assert false : "Invalid event: " + evtData; } + + if (crd) { + if (evtData.allAcksReceived()) { + processNodesAckEvent(evtData); + + it.remove(); + } + } + } + + if (joined) { + locNodeInfo.lastProcEvt = evtData.eventId(); + + procEvtCnt++; + + if (procEvtCnt % evtsAckThreshold == 0) + updateNodeInfo = true; } + } - if (joined) - lastProcEvt = e.getKey(); + if (!crd && updateNodeInfo) { + assert locNodeZkPath != null; + + zkClient.setData(locNodeZkPath, marshal(locNodeInfo), -1); } } + private void processLocalJoin(ZkDiscoveryEventsData evtsData, ZkDiscoveryNodeJoinEventData evtData) + throws Exception + { + if (log.isInfoEnabled()) + log.info("Local join event data: " + evtData + ']'); + + String path = zkPaths.joinEventDataPathForJoined(evtData.eventId()); + + ZkJoinEventDataForJoined dataForJoined = unmarshal(zkClient.getData(path)); + + gridStartTime = evtsData.gridStartTime; + + locNode.internalId(evtData.joinedInternalId); + locNode.order(evtData.topologyVersion()); + + DiscoveryDataBag dataBag = new DiscoveryDataBag(locNode.id()); + + dataBag.commonData(dataForJoined.discoveryData()); + + exchange.onExchange(dataBag); + + List<ZookeeperClusterNode> allNodes = dataForJoined.topology(); + + for (ZookeeperClusterNode node : allNodes) + top.addNode(node); + + top.addNode(locNode); + + List<ClusterNode> topSnapshot = new ArrayList<>((Collection)top.nodesByOrder.values()); + + lsnr.onDiscovery(evtData.eventType(), + evtData.topologyVersion(), + locNode, + topSnapshot, + Collections.<Long, Collection<ClusterNode>>emptyMap(), + null); + + joinFut.onDone(); + + joined = true; + + // TODO ZK: async + zkClient.deleteIfExists(path, -1); + } + /** * @param evtData Event data. * @param msg Custom message. @@ -953,19 +1116,6 @@ public class ZookeeperDiscoveryImpl { topSnapshot, Collections.<Long, Collection<ClusterNode>>emptyMap(), msg); - - if (crd) { - ZkEventAckFuture fut = new ZkEventAckFuture(this, - zkPaths.customEvtsDir + "/" + evtData.evtPath, - evtData.eventId()); - - ackFuts.put(evtData.eventId(), fut); - } - else { - String ackPath = zkPaths.customEvtsDir + "/" + evtData.evtPath + "/" + locNode.internalId(); - - zkClient.createAsync(ackPath, null, CreateMode.PERSISTENT, null); - } } /** @@ -995,7 +1145,7 @@ public class ZookeeperDiscoveryImpl { * @param evtData Event data. */ @SuppressWarnings("unchecked") - private void notifyNodeFail(ZkDiscoveryNodeFailEventData evtData) { + private void notifyNodeFail(ZkDiscoveryNodeFailEventData evtData) throws Exception { ZookeeperClusterNode failedNode = top.removeNode(evtData.failedNodeInternalId()); assert failedNode != null; @@ -1008,14 +1158,74 @@ public class ZookeeperDiscoveryImpl { topSnapshot, Collections.<Long, Collection<ClusterNode>>emptyMap(), null); + } + + /** + * @param evtData + * @throws Exception + */ + private void processNodesAckEvent(ZkDiscoveryEventData evtData) throws Exception { + switch (evtData.eventType()) { + case EventType.EVT_NODE_JOINED: { + processNodesAckJoinEvent((ZkDiscoveryNodeJoinEventData)evtData); + + break; + } + + case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: { + processNodesAckCustomEvent((ZkDiscoveryCustomEventData)evtData); + + break; + } + + case EventType.EVT_NODE_FAILED: { + log.info("All nodes processed node fail [evtId=" + evtData.eventId() + ']'); + + // Do not need cleanup. + break; + } + } + } - if (crd) { - for (ZkEventAckFuture ackFut : ackFuts.values()) - ackFut.onNodeFail(failedNode); + /** + * @param failedNode Failed node. + */ + private void processEventAcksOnNodeFail(ZookeeperClusterNode failedNode) throws Exception { + for (Iterator<Map.Entry<Long, ZkDiscoveryEventData>> it = evtsData.evts.entrySet().iterator(); it.hasNext();) { + Map.Entry<Long, ZkDiscoveryEventData> e = it.next(); + + ZkDiscoveryEventData evtData = e.getValue(); + + if (evtData.onNodeFail(failedNode)) { + processNodesAckEvent(evtData); + + it.remove(); + } } } /** + * @param evtData Event data. + * @throws Exception If failed. + */ + private void processNodesAckJoinEvent(ZkDiscoveryNodeJoinEventData evtData) throws Exception { + log.info("All nodes processed node join [evtId=" + evtData.eventId() + ']'); + + zkClient.deleteIfExists(zkPaths.joinEventDataPath(evtData.eventId()), -1); + zkClient.deleteIfExists(zkPaths.joinEventDataPathForJoined(evtData.eventId()), -1); + } + + /** + * @param evtData Event data. + * @throws Exception If failed. + */ + private void processNodesAckCustomEvent(ZkDiscoveryCustomEventData evtData) throws Exception { + log.info("All nodes processed custom event [evtId=" + evtData.eventId() + ']'); + + zkClient.deleteIfExists(zkPaths.customEventDataPath(evtData.evtPath), -1); + } + + /** * */ public void stop() { @@ -1070,10 +1280,10 @@ public class ZookeeperDiscoveryImpl { U.warn(log, "Zookeeper connection loss, local node is SEGMENTED"); if (joined) { - assert evts != null; + assert evtsData != null; lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED, - evts.topVer, + evtsData.topVer, locNode, Collections.<ClusterNode>emptyList(), Collections.<Long, Collection<ClusterNode>>emptyMap(), http://git-wip-us.apache.org/repos/asf/ignite/blob/18527db9/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java index b0df770..21f88c8 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java @@ -43,6 +43,7 @@ import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteInClosure; @@ -58,6 +59,7 @@ import org.apache.zookeeper.ZooKeeper; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD; import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET; /** @@ -172,6 +174,20 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, "1"); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + System.clearProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); @@ -234,6 +250,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { Ignite srv0 = startGrid(0); srv0.createCache(new CacheConfiguration<>("c1")); + + waitForEventsAcks(srv0); } /** @@ -245,6 +263,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { srv0.createCache(new CacheConfiguration<>("c1")); awaitPartitionMapExchange(); + + waitForEventsAcks(srv0); } /** @@ -594,6 +614,10 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { waitForTopology(5); + awaitPartitionMapExchange(); + + waitForEventsAcks(ignite(0)); + stopGrid(0); waitForTopology(4); @@ -604,6 +628,31 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { startGrid(0); waitForTopology(5); + + awaitPartitionMapExchange(); + + waitForEventsAcks(grid(CU.oldest(ignite(1).cluster().nodes()))); + } + + /** + * @param node Node. + * @throws Exception If failed. + */ + private void waitForEventsAcks(final Ignite node) throws Exception { + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + Map<Object, Object> evts = GridTestUtils.getFieldValue(node.configuration().getDiscoverySpi(), + "impl", "evtsData", "evts"); + + if (!evts.isEmpty()) { + info("Unacked events: " + evts); + + return false; + } + + return true; + } + }, 10_000)); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/18527db9/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java index f85cf5a..8aac456 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi.discovery.zk.internal; +import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -51,6 +52,28 @@ public class ZookeeperClientTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testDeleteAll() throws Exception { + startZK(1); + + ZookeeperClient client = new ZookeeperClient(log, zkCluster.getConnectString(), 3000, null); + + client.createIfNeeded("/apacheIgnite", null, CreateMode.PERSISTENT); + client.createIfNeeded("/apacheIgnite/1", null, CreateMode.PERSISTENT); + client.createIfNeeded("/apacheIgnite/2", null, CreateMode.PERSISTENT); + + client.deleteAll("/apacheIgnite", Arrays.asList("1", "2"), -1); + + assertTrue(client.getChildren("/apacheIgnite").isEmpty()); + + client.createIfNeeded("/apacheIgnite/1", null, CreateMode.PERSISTENT); + client.deleteAll("/apacheIgnite", Arrays.asList("1"), -1); + + assertTrue(client.getChildren("/apacheIgnite").isEmpty()); + } + + /** + * @throws Exception If failed. + */ public void testConnectionLoss1() throws Exception { ZookeeperClient client = new ZookeeperClient(log, "localhost:2200", 3000, null);