zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fb6bd0ac Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fb6bd0ac Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fb6bd0ac Branch: refs/heads/ignite-zk Commit: fb6bd0ac39f97db0d7e347aff6fa26edda10f940 Parents: fcee8c8 Author: sboikov <sboi...@gridgain.com> Authored: Tue Nov 21 13:43:15 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Nov 21 15:09:20 2017 +0300 ---------------------------------------------------------------------- .../discovery/zk/internal/ZkEventAckFuture.java | 139 +++++++++++++++++++ .../discovery/zk/internal/ZookeeperClient.java | 121 ++++++++++++++-- .../zk/internal/ZookeeperDiscoveryImpl.java | 55 +++++++- 3 files changed, 296 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/fb6bd0ac/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java new file mode 100644 index 0000000..fa0da99 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class ZkEventAckFuture extends GridFutureAdapter<Void> implements Watcher, AsyncCallback.Children2Callback { + /** */ + private final IgniteLogger log; + + /** */ + private final ZookeeperDiscoveryImpl impl; + + /** */ + private final Long evtId; + + /** */ + private final String evtPath; + + /** */ + private final int expAcks; + + /** */ + private final Set<Integer> remaininAcks; + + ZkEventAckFuture(ZookeeperDiscoveryImpl impl, String evtPath, Long evtId) { + this.impl = impl; + this.log = impl.log(); + this.evtPath = evtPath; + this.evtId = evtId; + + ZkClusterNodes top = impl.nodes(); + + remaininAcks = U.newHashSet(top.nodesById.size()); + + for (ZookeeperClusterNode node : top.nodesByInternalId.values()) { + if (!node.isLocal()) + remaininAcks.add(node.internalId()); + } + + expAcks = remaininAcks.size(); + + if (expAcks == 0) + onDone(); + else + impl.zkClient().getChildrenAsync(evtPath, this, this); + } + + /** + * @return Event ID. + */ + Long eventId() { + return evtId; + } + + /** + * @param node Failed node. + */ + void onNodeFail(ZookeeperClusterNode node) { + assert !remaininAcks.isEmpty(); + + if (remaininAcks.remove(node.internalId()) && remaininAcks.isEmpty()) + onDone(); + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + impl.removeAckFuture(this); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public void process(WatchedEvent evt) { + if (isDone()) + return; + + if (evt.getType() == Event.EventType.NodeChildrenChanged) { + if (evtPath.equals(evt.getPath())) + impl.zkClient().getChildrenAsync(evtPath, this, this); + else + U.warn(log, "Received event for unknown path: " + evt.getPath()); + } + } + + /** {@inheritDoc} */ + @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { + assert rc == 0 : rc; + + if (isDone()) + return; + + if (expAcks == stat.getCversion()) { + log.info("Received expected number of acks [expCnt=" + expAcks + ", cVer=" + stat.getCversion() + ']'); + + onDone(); + } + else { + for (int i = 0; i < children.size(); i++) { + Integer nodeInternalId = Integer.parseInt(children.get(i)); + + if (remaininAcks.remove(nodeInternalId) && remaininAcks.size() == 0) + onDone(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/fb6bd0ac/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java index 6393b90..4fdc9fc 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi.discovery.zk.internal; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.List; import java.util.Timer; import java.util.TimerTask; @@ -27,6 +28,7 @@ import org.apache.ignite.lang.IgniteRunnable; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Op; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; @@ -264,6 +266,29 @@ public class ZookeeperClient implements Watcher { } + void deleteAllIfExists(String parent, List<String> paths, int ver) + throws ZookeeperClientFailedException, InterruptedException + { + // TODO ZK: need check for max size? + List<Op> ops = new ArrayList<>(paths.size()); + + for (String path : paths) + ops.add(Op.delete(parent + "/" + path, ver)); + + for (;;) { + long connStartTime = this.connStartTime; + + try { + zk.multi(ops); + + return; + } + catch (Exception e) { + onZookeeperError(connStartTime, e); + } + } + } + void delete(String path, int ver) throws KeeperException.NoNodeException, ZookeeperClientFailedException, InterruptedException { @@ -331,7 +356,7 @@ public class ZookeeperClient implements Watcher { void getChildrenAsync(String path, Watcher watcher, AsyncCallback.Children2Callback cb) { GetChildrenOperation op = new GetChildrenOperation(path, watcher, cb); - zk.getChildren(path, watcher, new ChildreCallbackWrapper(op), null); + zk.getChildren(path, watcher, new ChildrenCallbackWrapper(op), null); } void getDataAsync(String path, Watcher watcher, AsyncCallback.DataCallback cb) { @@ -340,6 +365,15 @@ public class ZookeeperClient implements Watcher { zk.getData(path, watcher, new DataCallbackWrapper(op), null); } + void createAsync(String path, byte[] data, CreateMode createMode, AsyncCallback.StringCallback cb) { + if (data == null) + data = EMPTY_BYTES; + + CreateOperation op = new CreateOperation(path, data, createMode, cb); + + zk.create(path, data, ZK_ACL, createMode, new CreateCallbackWrapper(op), null); + } + /** * */ @@ -437,6 +471,29 @@ public class ZookeeperClient implements Watcher { /** * */ + private void closeClient() { + try { + zk.close(); + } + catch (Exception closeErr) { + U.warn(log, "Failed to close zookeeper client: " + closeErr, closeErr); + } + + connTimer.cancel(); + } + + /** + * + */ + private void scheduleConnectionCheck() { + assert state == ConnectionState.Disconnected : state; + + connTimer.schedule(new ConnectionTimeoutTask(connStartTime), connLossTimeout); + } + + /** + * + */ interface ZkAsyncOperation { /** * @@ -532,37 +589,75 @@ public class ZookeeperClient implements Watcher { /** * */ - private void closeClient() { - try { - zk.close(); - } - catch (Exception closeErr) { - U.warn(log, "Failed to close zookeeper client: " + closeErr, closeErr); + class CreateOperation implements ZkAsyncOperation { + /** */ + private final String path; + + /** */ + private final byte[] data; + + /** */ + private final CreateMode createMode; + + /** */ + private final AsyncCallback.StringCallback cb; + + CreateOperation(String path, byte[] data, CreateMode createMode, AsyncCallback.StringCallback cb) { + this.path = path; + this.data = data; + this.createMode = createMode; + this.cb = cb; } - connTimer.cancel(); + /** {@inheritDoc} */ + @Override public void execute() { + createAsync(path, data, createMode, cb); + } } /** * */ - private void scheduleConnectionCheck() { - assert state == ConnectionState.Disconnected : state; + class CreateCallbackWrapper implements AsyncCallback.StringCallback { + /** */ + final CreateOperation op; - connTimer.schedule(new ConnectionTimeoutTask(connStartTime), connLossTimeout); + /** + * @param op Operation. + */ + CreateCallbackWrapper(CreateOperation op) { + this.op = op; + } + + @Override public void processResult(int rc, String path, Object ctx, String name) { + if (rc == KeeperException.Code.NODEEXISTS.intValue()) + return; + + if (needRetry(rc)) { + U.warn(log, "Failed to execute async operation, connection lost. Will retry after connection restore [path=" + path + ']'); + + retryQ.add(op); + } + else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) + U.warn(log, "Failed to execute async operation, connection lost [path=" + path + ']'); + else { + if (op.cb != null) + op.cb.processResult(rc, path, ctx, name); + } + } } /** * */ - class ChildreCallbackWrapper implements AsyncCallback.Children2Callback { + class ChildrenCallbackWrapper implements AsyncCallback.Children2Callback { /** */ private final GetChildrenOperation op; /** * @param op Operation. */ - private ChildreCallbackWrapper(GetChildrenOperation op) { + private ChildrenCallbackWrapper(GetChildrenOperation op) { this.op = op; } http://git-wip-us.apache.org/repos/asf/ignite/blob/fb6bd0ac/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index ece71f9..6f8c07f 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import org.apache.curator.utils.PathUtils; import org.apache.ignite.IgniteCheckedException; @@ -112,6 +113,9 @@ public class ZookeeperDiscoveryImpl { /** */ private boolean crd; + /** */ + private Map<Long, ZkEventAckFuture> ackFuts = new ConcurrentHashMap<>(); + /** * @param log * @param basePath @@ -145,6 +149,10 @@ public class ZookeeperDiscoveryImpl { dataCallback = new ZkDataCallback(); } + IgniteLogger log() { + return log; + } + public ClusterNode localNode() { return locNode; } @@ -664,21 +672,38 @@ public class ZookeeperDiscoveryImpl { // TODO ZK: use multi. zkClient.setData(zkPaths.evtsPath, null, -1); - for (String evtPath : zkClient.getChildren(zkPaths.evtsPath)) { + List<String> evtChildren = zkClient.getChildren(zkPaths.evtsPath); + + for (String evtPath : evtChildren) { String evtDir = zkPaths.evtsPath + "/" + evtPath; removeChildren(evtDir); - - zkClient.delete(evtDir, -1); } + zkClient.deleteAllIfExists(zkPaths.evtsPath, evtChildren, -1); + for (String evtPath : zkClient.getChildren(zkPaths.customEvtsDir)) zkClient.delete(zkPaths.customEvtsDir + "/" + evtPath, -1); } + /** + * @param path Path. + * @throws Exception If failed. + */ private void removeChildren(String path) throws Exception { - for (String childPath : zkClient.getChildren(path)) - zkClient.delete(path + "/" + childPath, -1); + zkClient.deleteAllIfExists(path, zkClient.getChildren(path), -1); + } + + void removeAckFuture(ZkEventAckFuture fut) { + ackFuts.remove(fut.eventId()); + } + + ZkClusterNodes nodes() { + return top; + } + + ZookeeperClient zkClient() { + return zkClient; } /** @@ -914,7 +939,7 @@ public class ZookeeperDiscoveryImpl { @SuppressWarnings("unchecked") private void notifyCustomEvent(ZkDiscoveryCustomEventData evtData, DiscoverySpiCustomMessage msg) { if (log.isInfoEnabled()) - log.info(" [topVer=" + evtData.topologyVersion() + ", msg=" + msg.getClass().getSimpleName() + ']'); + log.info(" [topVer=" + evtData.topologyVersion() + ", msg=" + msg + ']'); ZookeeperClusterNode sndNode = top.nodesById.get(evtData.sndNodeId); @@ -928,6 +953,19 @@ public class ZookeeperDiscoveryImpl { topSnapshot, Collections.<Long, Collection<ClusterNode>>emptyMap(), msg); + + if (crd) { + ZkEventAckFuture fut = new ZkEventAckFuture(this, + zkPaths.customEvtsDir + "/" + evtData.evtPath, + evtData.eventId()); + + ackFuts.put(evtData.eventId(), fut); + } + else { + String ackPath = zkPaths.customEvtsDir + "/" + evtData.evtPath + "/" + locNode.internalId(); + + zkClient.createAsync(ackPath, null, CreateMode.PERSISTENT, null); + } } /** @@ -970,6 +1008,11 @@ public class ZookeeperDiscoveryImpl { topSnapshot, Collections.<Long, Collection<ClusterNode>>emptyMap(), null); + + if (crd) { + for (ZkEventAckFuture ackFut : ackFuts.values()) + ackFut.onNodeFail(failedNode); + } } /**