zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7e8f85ff Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7e8f85ff Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7e8f85ff Branch: refs/heads/ignite-zk Commit: 7e8f85ff81f1cfc0f163f86d623f7e558177aa2c Parents: 376a484 Author: sboikov <[email protected]> Authored: Thu Dec 14 11:55:56 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Dec 14 13:56:54 2017 +0300 ---------------------------------------------------------------------- .../zk/internal/ZkAbstractChildrenCallback.java | 8 + .../zk/internal/ZkCollectDistributedFuture.java | 171 --------- .../ZkCommunicationErrorProcessFuture.java | 114 ++++-- ...kCommunicationErrorResolveFinishMessage.java | 23 +- .../ZkCommunicationErrorResolveResult.java | 37 ++ ...ZkCommunicationErrorResolveStartMessage.java | 6 + .../zk/internal/ZkDiscoveryCustomEventData.java | 19 +- .../ZkDistributedCollectDataFuture.java | 223 ++++++++++++ .../discovery/zk/internal/ZkIgnitePaths.java | 20 +- .../discovery/zk/internal/ZookeeperClient.java | 5 + .../zk/internal/ZookeeperDiscoveryImpl.java | 360 +++++++++++-------- .../ZookeeperDiscoverySpiBasicTest.java | 176 ++++++++- 12 files changed, 808 insertions(+), 354 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java index 5679993..2292e35 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java @@ -48,6 +48,14 @@ abstract class ZkAbstractChildrenCallback extends ZkAbstractCallabck implements } } + /** + * @param rc + * @param path + * @param ctx + * @param children + * @param stat + * @throws Exception If failed. + */ abstract void processResult0(int rc, String path, Object ctx, List<String> children, Stat stat) throws Exception; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCollectDistributedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCollectDistributedFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCollectDistributedFuture.java deleted file mode 100644 index fa529cf..0000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCollectDistributedFuture.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.zk.internal; - -import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.data.Stat; -import org.jetbrains.annotations.Nullable; - -/** - * - */ -class ZkCollectDistributedFuture extends GridFutureAdapter<Void> { - /** */ - private final IgniteLogger log; - - /** */ - private final String futPath; - - /** */ - private final ZookeeperDiscoveryImpl impl; - - /** */ - private final Set<Long> remainingNodes; - - /** */ - private final Callable<Void> lsnr; - - /** - * @param impl - * @param rtState - * @param futPath - */ - ZkCollectDistributedFuture(ZookeeperDiscoveryImpl impl, ZkRuntimeState rtState, String futPath, Callable<Void> lsnr) throws Exception { - this.impl = impl; - this.log = impl.log(); - this.futPath = futPath; - this.lsnr = lsnr; - - ZkClusterNodes top = impl.nodes(); - - remainingNodes = U.newHashSet(top.nodesByOrder.size()); - - for (ZookeeperClusterNode node : top.nodesByInternalId.values()) - remainingNodes.add(node.order()); - - NodeResultsWatcher watcher = new NodeResultsWatcher(rtState, impl); - - if (remainingNodes.isEmpty()) - completeAndNotifyListener(); - else - rtState.zkClient.getChildrenAsync(futPath, watcher, watcher); - } - - /** - * @throws Exception If listener call failed. - */ - private void completeAndNotifyListener() throws Exception { - if (super.onDone()) - lsnr.call(); - } - - /** - * @param futPath - * @param client - * @param nodeOrder - * @param data - * @throws Exception If failed. - */ - static void saveNodeResult(String futPath, ZookeeperClient client, long nodeOrder, byte[] data) throws Exception { - client.createIfNeeded(futPath + "/" + nodeOrder, data, CreateMode.PERSISTENT); - } - - /** - * @param node Failed node. - */ - void onNodeFail(ZookeeperClusterNode node) throws Exception { - long nodeOrder = node.order(); - - if (remainingNodes.remove(nodeOrder)) { - int remaining = remainingNodes.size(); - - if (log.isInfoEnabled()) { - log.info("ZkCollectDistributedFuture removed remaining failed node [node=" + nodeOrder + - ", remaining=" + remaining + - ", futPath=" + futPath + ']'); - } - - if (remaining == 0) - completeAndNotifyListener(); - } - } - - /** - * - */ - class NodeResultsWatcher extends ZkAbstractWatcher implements AsyncCallback.Children2Callback { - /** - * @param rtState Runtime state. - * @param impl Discovery impl. - */ - NodeResultsWatcher(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) { - super(rtState, impl); - } - - /** {@inheritDoc} */ - @Override protected void process0(WatchedEvent evt) { - if (evt.getType() == Watcher.Event.EventType.NodeChildrenChanged) - impl.zkClient().getChildrenAsync(evt.getPath(), this, this); - } - - /** {@inheritDoc} */ - @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - if (!onProcessStart()) - return; - - try { - assert rc == 0 : KeeperException.Code.get(rc); - - if (isDone()) - return; - - for (int i = 0; i < children.size(); i++) { - Long nodeOrder = Long.parseLong(children.get(i)); - - if (remainingNodes.remove(nodeOrder)) { - int remaining = remainingNodes.size(); - - if (log.isInfoEnabled()) { - log.info("ZkCollectDistributedFuture added new result [node=" + nodeOrder + - ", remaining=" + remaining + - ", futPath=" + path + ']'); - } - - if (remaining == 0) - completeAndNotifyListener(); - } - } - - onProcessEnd(); - } - catch (Throwable e) { - onProcessError(e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java index d87f500..15744a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java @@ -18,11 +18,10 @@ package org.apache.ignite.spi.discovery.zk.internal; import java.util.Collection; -import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.Callable; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -31,6 +30,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.spi.IgniteSpiTimeoutObject; +import org.jboss.netty.util.internal.ConcurrentHashMap; import org.jetbrains.annotations.Nullable; /** @@ -41,7 +41,10 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen private final ZookeeperDiscoveryImpl impl; /** */ - private final Map<Long, GridFutureAdapter<Boolean>> nodeFuts = new HashMap<>(); + private final IgniteLogger log; + + /** */ + private final Map<Long, GridFutureAdapter<Boolean>> nodeFuts = new ConcurrentHashMap<>(); /** */ private final long endTime; @@ -59,7 +62,10 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen private Set<Long> resFailedNodes; /** */ - private ZkCollectDistributedFuture nodeResFut; + private Exception resErr; + + /** */ + private ZkDistributedCollectDataFuture collectResFut; /** * @param impl Discovery impl. @@ -87,6 +93,7 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen assert state != State.DONE; this.impl = impl; + this.log = impl.log(); if (state == State.WAIT_TIMEOUT) { assert timeout > 0 : timeout; @@ -102,15 +109,44 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen this.state = state; } - void nodeResultCollectFuture(ZkCollectDistributedFuture nodeResFut) { - assert nodeResFut == null : nodeResFut; + /** {@inheritDoc} */ + @Nullable @Override public IgniteLogger logger() { + return log; + } - this.nodeResFut = nodeResFut; + /** + * @param collectResFut Collect nodes' communication status future. + */ + void nodeResultCollectFuture(ZkDistributedCollectDataFuture collectResFut) { + assert this.collectResFut == null : collectResFut; + + this.collectResFut = collectResFut; } + /** + * @param top Topology. + * @throws Exception If failed. + */ + void onTopologyChange(ZkClusterNodes top) throws Exception { + for (Map.Entry<Long, GridFutureAdapter<Boolean>> e : nodeFuts.entrySet()) { + if (!top.nodesByOrder.containsKey(e.getKey())) + e.getValue().onDone(false); + } + + if (collectResFut != null) + collectResFut.onTopologyChange(top); + } + + /** + * @param locNodeOrder Local node order. + * @param rtState Runtime state. + * @param futPath Future path. + * @param nodes Nodes to ping. + * @throws Exception If failed. + */ void pingNodesAndNotifyFuture(long locNodeOrder, ZkRuntimeState rtState, String futPath, Collection<ClusterNode> nodes) throws Exception { - ZkCollectDistributedFuture.saveNodeResult(futPath, rtState.zkClient, locNodeOrder, null); + ZkDistributedCollectDataFuture.saveNodeResult(futPath, rtState.zkClient, locNodeOrder, null); } /** @@ -146,6 +182,37 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen } /** + * @param failedNodes Node failed as result of resolve process. + */ + void onFinishResolve(Set<Long> failedNodes) { + Map<Long, GridFutureAdapter<Boolean>> futs; + + synchronized (this) { + if (state == State.DONE) { + assert resErr != null; + + return; + } + + assert state == State.RESOLVE_STARTED : state; + + state = State.DONE; + + resFailedNodes = failedNodes; + + futs = nodeFuts; // nodeFuts should not be modified after state changed to DONE. + } + + for (Map.Entry<Long, GridFutureAdapter<Boolean>> e : futs.entrySet()) { + Boolean res = !F.contains(resFailedNodes, e.getKey()); + + e.getValue().onDone(res); + } + + onDone(); + } + + /** * @param node Node. * @return Future finished when communication error resolve is done or {@code null} if another * resolve process should be started. @@ -176,27 +243,19 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen return fut; } - /** - * @param node Failed node. - */ - void onNodeFailed(ClusterNode node) { - GridFutureAdapter<Boolean> fut = null; - - synchronized (this) { - if (state == State.WAIT_TIMEOUT) - fut = nodeFuts.get(node.order()); - } - - if (fut != null) - fut.onDone(false); - } - /** {@inheritDoc} */ @Override public void run() { // Run from zk discovery worker pool after timeout. - if (processTimeout()) { + if (needProcessTimeout()) { try { - impl.sendCustomMessage(new ZkCommunicationErrorResolveStartMessage(UUID.randomUUID())); + UUID reqId = UUID.randomUUID(); + + if (log.isInfoEnabled()) { + log.info("Initiate cluster-wide communication error resolve process [reqId=" + reqId + + ", errNodes=" + nodeFuts.size() + ']'); + } + + impl.sendCustomMessage(new ZkCommunicationErrorResolveStartMessage(reqId)); } catch (Exception e) { Collection<GridFutureAdapter<Boolean>> futs; @@ -206,6 +265,7 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen return; state = State.DONE; + resErr = e; futs = nodeFuts.values(); // nodeFuts should not be modified after state changed to DONE. } @@ -221,7 +281,7 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen /** * @return {@code True} if need initiate resolve process after timeout expired. */ - private boolean processTimeout() { + private boolean needProcessTimeout() { synchronized (this) { if (state != State.WAIT_TIMEOUT) return false; @@ -251,7 +311,7 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen /** {@inheritDoc} */ @Override public void onTimeout() { - if (processTimeout()) + if (needProcessTimeout()) impl.runInWorkerThread(this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java index 144a5bf..20aeddf 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java @@ -18,21 +18,42 @@ package org.apache.ignite.spi.discovery.zk.internal; import java.util.UUID; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.jetbrains.annotations.Nullable; /** * */ -class ZkCommunicationErrorResolveFinishMessage implements ZkInternalMessage { +class ZkCommunicationErrorResolveFinishMessage implements DiscoverySpiCustomMessage, ZkInternalMessage { /** */ private static final long serialVersionUID = 0L; /** */ final UUID futId; + /** */ + transient ZkCommunicationErrorResolveResult res; + /** * @param futId Future ID. */ ZkCommunicationErrorResolveFinishMessage(UUID futId) { this.futId = futId; } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoverySpiCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ZkCommunicationErrorResolveFinishMessage.class, this); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java new file mode 100644 index 0000000..745496b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; +import org.apache.ignite.internal.util.GridLongList; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +class ZkCommunicationErrorResolveResult implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + final GridLongList failedNodes; + + ZkCommunicationErrorResolveResult(@Nullable GridLongList failedNodes) { + this.failedNodes = failedNodes; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java index e619d7b..e85277b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi.discovery.zk.internal; import java.util.UUID; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.jetbrains.annotations.Nullable; @@ -47,4 +48,9 @@ public class ZkCommunicationErrorResolveStartMessage implements DiscoverySpiCust @Override public boolean isMutable() { return false; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ZkCommunicationErrorResolveStartMessage.class, this); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java index 0d2288c..6375bc7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java @@ -38,22 +38,33 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData { /** */ final String evtPath; - /** */ - transient DiscoverySpiCustomMessage msg; + /** Message instance (can be marshalled as part of ZkDiscoveryCustomEventData or stored in separate znode. */ + DiscoverySpiCustomMessage msg; + + /** Unmarshalled message. */ + transient DiscoverySpiCustomMessage resolvedMsg; /** * @param evtId Event ID. * @param topVer Topology version. * @param sndNodeId Sender node ID. + * @param msg Message instance. * @param evtPath Event path. * @param ack Acknowledge event flag. */ - ZkDiscoveryCustomEventData(long evtId, long topVer, UUID sndNodeId, String evtPath, boolean ack) { + ZkDiscoveryCustomEventData(long evtId, + long topVer, + UUID sndNodeId, + DiscoverySpiCustomMessage msg, + String evtPath, + boolean ack) + { super(evtId, DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, topVer); assert sndNodeId != null; - assert ack || !F.isEmpty(evtPath); + assert msg != null || ack || !F.isEmpty(evtPath); + this.msg = msg; this.sndNodeId = sndNodeId; this.evtPath = evtPath; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java new file mode 100644 index 0000000..d33001b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; + +/** + * + */ +class ZkDistributedCollectDataFuture extends GridFutureAdapter<Void> { + /** */ + private final IgniteLogger log; + + /** */ + private final String futPath; + + /** */ + private final Set<Long> remainingNodes; + + /** */ + private final Callable<Void> lsnr; + + /** + * @param impl Disovery impl + * @param rtState Runtime state. + * @param futPath Future path. + * @param lsnr Future listener. + * @throws Exception If listener call failed. + */ + ZkDistributedCollectDataFuture( + ZookeeperDiscoveryImpl impl, + ZkRuntimeState rtState, + String futPath, + Callable<Void> lsnr) + throws Exception + { + this.log = impl.log(); + this.futPath = futPath; + this.lsnr = lsnr; + + ZkClusterNodes top = rtState.top; + + remainingNodes = U.newHashSet(top.nodesByOrder.size()); + + for (ZookeeperClusterNode node : top.nodesByInternalId.values()) + remainingNodes.add(node.order()); + + NodeResultsWatcher watcher = new NodeResultsWatcher(rtState, impl); + + if (remainingNodes.isEmpty()) + completeAndNotifyListener(); + else { + if (log.isInfoEnabled()) + log.info("Initialize data collect future [futPath=" + futPath + ", nodes=" + remainingNodes.size() + ']'); + + rtState.zkClient.getChildrenAsync(futPath, watcher, watcher); + } + } + + /** + * @throws Exception If listener call failed. + */ + private void completeAndNotifyListener() throws Exception { + if (super.onDone()) + lsnr.call(); + } + + /** + * @param futPath + * @param client + * @param nodeOrder + * @param data + * @throws Exception If failed. + */ + static void saveNodeResult(String futPath, ZookeeperClient client, long nodeOrder, byte[] data) throws Exception { + client.createIfNeeded(futPath + "/" + nodeOrder, data, CreateMode.PERSISTENT); + } + + /** + * @param futResPath Result path. + * @param client Client. + * @param data Result data. + * @throws Exception If failed. + */ + static void saveResult(String futResPath, ZookeeperClient client, byte[] data) throws Exception { + client.createIfNeeded(futResPath, data, CreateMode.PERSISTENT); + } + + static byte[] readResult(ZookeeperClient client, ZkIgnitePaths paths, UUID futId) throws Exception { + return client.getData(paths.distributedFutureResultPath(futId)); + } + + /** + * @param client Client. + * @param paths Paths utils. + * @param futId Future ID. + * @throws Exception If failed. + */ + static void deleteFutureData(ZookeeperClient client, ZkIgnitePaths paths, UUID futId) throws Exception { + // TODO ZK: use multi, better batching + max-size safe + NoNodeException safe. + String evtDir = paths.distributedFutureBasePath(futId); + + client.deleteAll(evtDir, + client.getChildren(evtDir), + -1); + + client.deleteIfExists(evtDir, -1); + + client.deleteIfExists(paths.distributedFutureResultPath(futId), -1); + } + + /** + * @param top Current topology. + * @throws Exception If listener call failed. + */ + void onTopologyChange(ZkClusterNodes top) throws Exception { + if (remainingNodes.isEmpty()) + return; + + for (Iterator<Long> it = remainingNodes.iterator(); it.hasNext();) { + Long nodeOrder = it.next(); + + if (!top.nodesByOrder.containsKey(nodeOrder)) { + it.remove(); + + int remaining = remainingNodes.size(); + + if (log.isInfoEnabled()) { + log.info("ZkDistributedCollectDataFuture removed remaining failed node [node=" + nodeOrder + + ", remaining=" + remaining + + ", futPath=" + futPath + ']'); + } + + if (remaining == 0) { + completeAndNotifyListener(); + + break; + } + } + } + } + + /** + * + */ + class NodeResultsWatcher extends ZkAbstractWatcher implements AsyncCallback.Children2Callback { + /** + * @param rtState Runtime state. + * @param impl Discovery impl. + */ + NodeResultsWatcher(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) { + super(rtState, impl); + } + + /** {@inheritDoc} */ + @Override protected void process0(WatchedEvent evt) { + if (evt.getType() == Watcher.Event.EventType.NodeChildrenChanged) + rtState.zkClient.getChildrenAsync(evt.getPath(), this, this); + } + + /** {@inheritDoc} */ + @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { + if (!onProcessStart()) + return; + + try { + if (!isDone()) { + assert rc == 0 : KeeperException.Code.get(rc); + + for (int i = 0; i < children.size(); i++) { + Long nodeOrder = Long.parseLong(children.get(i)); + + if (remainingNodes.remove(nodeOrder)) { + int remaining = remainingNodes.size(); + + if (log.isInfoEnabled()) { + log.info("ZkDistributedCollectDataFuture added new result [node=" + nodeOrder + + ", remaining=" + remaining + + ", futPath=" + path + ']'); + } + + if (remaining == 0) + completeAndNotifyListener(); + } + } + } + + onProcessEnd(); + } + catch (Throwable e) { + onProcessError(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java index 06c5d9e..2a1d804 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java @@ -26,22 +26,22 @@ class ZkIgnitePaths { /** */ private static final int UUID_LEN = 36; - /** */ + /** Directory to store joined node data. */ private static final String JOIN_DATA_DIR = "jd"; - /** */ + /** Directory to store new custom events. */ private static final String CUSTOM_EVTS_DIR = "ce"; - /** */ + /** Directory to store parts of multi-parts custom events. */ private static final String CUSTOM_EVTS_PARTS_DIR = "cp"; - /** */ + /** Directory to store acknowledge messages for custom events. */ private static final String CUSTOM_EVTS_ACKS_DIR = "ca"; - /** */ + /** Directory to store EPHEMERAL znodes for alive cluster nodes. */ static final String ALIVE_NODES_DIR = "n"; - /** */ + /** Path to store discovery events {@link ZkDiscoveryEventsData}. */ private static final String DISCO_EVENTS_PATH = "e"; /** */ @@ -300,4 +300,12 @@ class ZkIgnitePaths { String distributedFutureBasePath(UUID id) { return evtsPath + "/f-" + id; } + + /** + * @param id Future ID. + * @return Future path. + */ + String distributedFutureResultPath(UUID id) { + return evtsPath + "/fr-" + id; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java index a83886a..5923b39 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java @@ -321,6 +321,11 @@ public class ZookeeperClient implements Watcher { return parts; } + /** + * TODO ZK: it seems not always precise, e.g. if ACL is used? + * @param path Request path. + * @return Marshalled request overhead. + */ private int requestOverhead(String path) { return path.length(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index a153d11..82d9c4b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -54,7 +54,6 @@ import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.MarshallerUtils; @@ -72,7 +71,6 @@ import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; import org.jboss.netty.util.internal.ConcurrentHashMap; import org.jetbrains.annotations.Nullable; @@ -151,7 +149,10 @@ public class ZookeeperDiscoveryImpl { public volatile IgniteDiscoverySpiInternalListener internalLsnr; /** */ - private final ConcurrentHashMap<UUID, PingFuture> pingFuts = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<Long, PingFuture> pingFuts = new ConcurrentHashMap<>(); + + /** */ + private final AtomicReference<ZkCommunicationErrorProcessFuture> commErrProcFut = new AtomicReference<>(); /** * @param spi Discovery SPI. @@ -230,9 +231,9 @@ public class ZookeeperDiscoveryImpl { return rtState.top.nodesByOrder.get(nodeOrder); } - /** */ - private final AtomicReference<ZkCommunicationErrorProcessFuture> commErrProcFut = new AtomicReference<>(); - + /** + * @param fut Future to remove. + */ void clearCommunicationErrorProcessFuture(ZkCommunicationErrorProcessFuture fut) { assert fut.isDone() : fut; @@ -262,6 +263,11 @@ public class ZookeeperDiscoveryImpl { if (commErrProcFut.compareAndSet(fut, newFut)) { fut = newFut; + if (log.isInfoEnabled()) { + log.info("Created new communication error process future [errNode=" + node0.id() + + ", err= " + err + ']'); + } + fut.scheduleCheckOnTimeout(); } else @@ -295,7 +301,9 @@ public class ZookeeperDiscoveryImpl { * @return Ping result. */ public boolean pingNode(UUID nodeId) { - ZookeeperClusterNode node = node(nodeId); + ZkRuntimeState rtState = this.rtState; + + ZookeeperClusterNode node = rtState.top.nodesById.get(nodeId); if (node == null) return false; @@ -303,12 +311,12 @@ public class ZookeeperDiscoveryImpl { if (node.isLocal()) return true; - PingFuture fut = pingFuts.get(nodeId); + PingFuture fut = pingFuts.get(node.order()); if (fut == null) { - fut = new PingFuture(node); + fut = new PingFuture(rtState, node); - PingFuture old = pingFuts.putIfAbsent(nodeId, fut); + PingFuture old = pingFuts.putIfAbsent(node.order(), fut); if (old == null) { if (fut.checkNodeAndState()) @@ -419,7 +427,7 @@ public class ZookeeperDiscoveryImpl { connState = ConnectionState.STOPPED; } - zkClient().zk().sync(zkPaths.clusterDir, new SegmentedWatcher(), null); + rtState.zkClient.zk().sync(zkPaths.clusterDir, new SegmentedWatcher(), null); } else joinFut.onDone(e); @@ -530,33 +538,7 @@ public class ZookeeperDiscoveryImpl { try { ZookeeperClient zkClient = rtState.zkClient; - String prefix = UUID.randomUUID().toString(); - - int partCnt = 1; - - int overhead = 10; - - UUID locId = locNode.id(); - - String path = zkPaths.createCustomEventPath(prefix, locId, partCnt); - - if (zkClient.needSplitNodeData(path, msgBytes, overhead)) { - List<byte[]> parts = zkClient.splitNodeData(path, msgBytes, overhead); - - String partsBasePath = zkPaths.customEventPartsBasePath(prefix, locId); - - saveMultipleParts(zkClient, partsBasePath, parts); - - msgBytes = null; - - partCnt = parts.size(); - } - - zkClient.createSequential(prefix, - zkPaths.customEvtsDir, - zkPaths.createCustomEventPath(prefix, locId, partCnt), - msgBytes, - CreateMode.PERSISTENT_SEQUENTIAL); + saveCustomMessage(zkClient, msgBytes); } catch (ZookeeperClientFailedException e) { if (clientReconnectEnabled) @@ -575,6 +557,44 @@ public class ZookeeperDiscoveryImpl { } /** + * @param zkClient Client. + * @param msgBytes Marshalled message. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + */ + private void saveCustomMessage(ZookeeperClient zkClient, byte[] msgBytes) + throws ZookeeperClientFailedException, InterruptedException + { + String prefix = UUID.randomUUID().toString(); + + int partCnt = 1; + + int overhead = 10; + + UUID locId = locNode.id(); + + String path = zkPaths.createCustomEventPath(prefix, locId, partCnt); + + if (zkClient.needSplitNodeData(path, msgBytes, overhead)) { + List<byte[]> parts = zkClient.splitNodeData(path, msgBytes, overhead); + + String partsBasePath = zkPaths.customEventPartsBasePath(prefix, locId); + + saveMultipleParts(zkClient, partsBasePath, parts); + + msgBytes = null; + + partCnt = parts.size(); + } + + zkClient.createSequential(prefix, + zkPaths.customEvtsDir, + zkPaths.createCustomEventPath(prefix, locId, partCnt), + msgBytes, + CreateMode.PERSISTENT_SEQUENTIAL); + } + + /** * @return Cluster start time. */ public long gridStartTime() { @@ -859,7 +879,7 @@ public class ZookeeperDiscoveryImpl { /** * */ - private class CheckJoinStateTimeoutObject implements IgniteSpiTimeoutObject, Watcher, AsyncCallback.DataCallback { + private class CheckJoinStateTimeoutObject extends ZkAbstractWatcher implements IgniteSpiTimeoutObject, AsyncCallback.DataCallback { /** */ private final IgniteUuid id = IgniteUuid.randomUuid(); @@ -869,16 +889,14 @@ public class ZookeeperDiscoveryImpl { /** */ private final String joinDataPath; - /** */ - private final ZkRuntimeState rtState; - /** * @param joinDataPath Node joined data path. * @param rtState State. */ CheckJoinStateTimeoutObject(String joinDataPath, ZkRuntimeState rtState) { + super(rtState, ZookeeperDiscoveryImpl.this); + this.joinDataPath = joinDataPath; - this.rtState = rtState; } /** {@inheritDoc} */ @@ -909,7 +927,7 @@ public class ZookeeperDiscoveryImpl { if (rc != 0) return; - if (!busyLock.enterBusy()) + if (!onProcessStart()) return; try { @@ -921,27 +939,17 @@ public class ZookeeperDiscoveryImpl { onSegmented(new IgniteSpiException(joinErr.err)); } - busyLock.leaveBusy(); + onProcessEnd(); } catch (Throwable e) { - onFatalError(busyLock, e); + onProcessError(e); } } /** {@inheritDoc} */ - @Override public void process(WatchedEvent evt) { - if (!busyLock.enterBusy()) - return; - - try { - if (evt.getType() == Event.EventType.NodeDataChanged) - rtState.zkClient.getDataAsync(evt.getPath(), this, this); - - busyLock.leaveBusy(); - } - catch (Throwable e) { - onFatalError(busyLock, e); - } + @Override public void process0(WatchedEvent evt) { + if (evt.getType() == Event.EventType.NodeDataChanged) + rtState.zkClient.getDataAsync(evt.getPath(), this, this); } } @@ -1243,18 +1251,20 @@ public class ZookeeperDiscoveryImpl { return true; } else { + ZookeeperClient client = rtState.zkClient; + if (joinErr.notifyNode) { String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId); - zkClient().setData(joinDataPath, marshalZip(joinErr), -1); + client.setData(joinDataPath, marshalZip(joinErr), -1); - zkClient().deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1); + client.deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1); } else { if (log.isInfoEnabled()) log.info("Ignore join data, node was failed by previous coordinator: " + aliveNodePath); - zkClient().deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1); + client.deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1); } return false; @@ -1483,8 +1493,10 @@ public class ZookeeperDiscoveryImpl { private void cleanupPreviousClusterData() throws Exception { long start = System.currentTimeMillis(); - // TODO ZK: use multi, better batching. - rtState.zkClient.setData(zkPaths.evtsPath, null, -1); + ZookeeperClient client = rtState.zkClient; + + // TODO ZK: use multi, better batching + max-size safe + NoNodeException safe. + client.setData(zkPaths.evtsPath, null, -1); List<String> evtChildren = rtState.zkClient.getChildren(zkPaths.evtsPath); @@ -1494,10 +1506,10 @@ public class ZookeeperDiscoveryImpl { removeChildren(evtDir); } - rtState.zkClient.deleteAll(zkPaths.evtsPath, evtChildren, -1); + client.deleteAll(zkPaths.evtsPath, evtChildren, -1); - rtState.zkClient.deleteAll(zkPaths.customEvtsDir, - rtState.zkClient.getChildren(zkPaths.customEvtsDir), + client.deleteAll(zkPaths.customEvtsDir, + client.getChildren(zkPaths.customEvtsDir), -1); rtState.zkClient.deleteAll(zkPaths.customEvtsPartsDir, @@ -1525,20 +1537,6 @@ public class ZookeeperDiscoveryImpl { } /** - * @return Nodes. - */ - ZkClusterNodes nodes() { - return rtState.top; - } - - /** - * @return Client. - */ - ZookeeperClient zkClient() { - return rtState.zkClient; - } - - /** * @param zkClient Client. * @param evtPath Event path. * @param sndNodeId Sender node ID. @@ -1654,7 +1652,8 @@ public class ZookeeperDiscoveryImpl { } else { if (log.isInfoEnabled()) { - log.info("Start communication error resolve [sndNode=" + sndNode + + log.info("Start cluster-wide communication error resolve [sndNode=" + sndNode + + ", reqId=" + msg0.id + ", topVer=" + evtsData.topVer + ']'); } @@ -1672,10 +1671,11 @@ public class ZookeeperDiscoveryImpl { evtsData.evtIdGen, evtsData.topVer, sndNodeId, + null, evtPath, false); - evtData.msg = msg; + evtData.resolvedMsg = msg; evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData); @@ -1746,7 +1746,7 @@ public class ZookeeperDiscoveryImpl { (ZkDiscoveryCustomEventData)newEvts.evts.get(evtData.eventId()); if (evtData0 != null) - evtData0.msg = ((ZkDiscoveryCustomEventData)evtData).msg; + evtData0.resolvedMsg = ((ZkDiscoveryCustomEventData)evtData).resolvedMsg; } } } @@ -1831,25 +1831,31 @@ public class ZookeeperDiscoveryImpl { DiscoverySpiCustomMessage msg; if (rtState.crd) { - assert evtData0.msg != null : evtData0; + assert evtData0.resolvedMsg != null : evtData0; - msg = evtData0.msg; + msg = evtData0.resolvedMsg; } else { - if (evtData0.ackEvent()) { - String path = zkPaths.ackEventDataPath(evtData0.eventId()); + if (evtData0.msg == null) { + if (evtData0.ackEvent()) { + String path = zkPaths.ackEventDataPath(evtData0.eventId()); - msg = unmarshalZip(zkClient.getData(path)); - } - else { - byte[] msgBytes = readCustomEventData(zkClient, - evtData0.evtPath, - evtData0.sndNodeId); + msg = unmarshalZip(zkClient.getData(path)); + } + else { + assert evtData0.evtPath != null : evtData0; - msg = unmarshalZip(msgBytes); + byte[] msgBytes = readCustomEventData(zkClient, + evtData0.evtPath, + evtData0.sndNodeId); + + msg = unmarshalZip(msgBytes); + } } + else + msg = evtData0.msg; - evtData0.msg = msg; + evtData0.resolvedMsg = msg; } if (msg instanceof ZkInternalMessage) @@ -1889,6 +1895,11 @@ public class ZookeeperDiscoveryImpl { zkClient.setData(rtState.locNodeZkPath, marshalZip(rtState.locNodeInfo), -1); } + + ZkCommunicationErrorProcessFuture commErrFut = commErrProcFut.get(); + + if (commErrFut != null) + commErrFut.onTopologyChange(rtState.top); // This can add new event, notify out of event process loop. } /** @@ -1973,12 +1984,13 @@ public class ZookeeperDiscoveryImpl { if (msg instanceof ZkForceNodeFailMessage) processForceNodeFailMessage((ZkForceNodeFailMessage)msg, evtData); else if (msg instanceof ZkCommunicationErrorResolveStartMessage) { - processStartResolveCommunicationErrorMessage( + processCommunicationErrorResolveStartMessage( (ZkCommunicationErrorResolveStartMessage)msg, evtData); } else if (msg instanceof ZkCommunicationErrorResolveFinishMessage) { - ZkCommunicationErrorResolveFinishMessage msg0 = (ZkCommunicationErrorResolveFinishMessage)msg; + processCommunicationErrorResolveFinishMessage( + (ZkCommunicationErrorResolveFinishMessage)msg); } } @@ -2012,9 +2024,48 @@ public class ZookeeperDiscoveryImpl { /** * @param msg Message. + * @throws Exception If failed. + */ + private void processCommunicationErrorResolveFinishMessage(ZkCommunicationErrorResolveFinishMessage msg) + throws Exception + { + UUID futId = msg.futId; + + assert futId != null; + assert futId.equals(rtState.evtsData.communicationErrorResolveFutureId()); + + if (log.isInfoEnabled()) + log.info("Received communication error resolve finish message [reqId=" + futId + ']'); + + rtState.evtsData.communicationErrorResolveFutureId(null); + + ZkCommunicationErrorResolveResult res = msg.res; + + if (res == null) + res = unmarshalZip(ZkDistributedCollectDataFuture.readResult(rtState.zkClient, zkPaths, futId)); + + ZkCommunicationErrorProcessFuture fut = commErrProcFut.get(); + + assert fut != null; + + Set<Long> failedNodes = null; + + if (res.failedNodes != null) { + failedNodes = U.newHashSet(res.failedNodes.size()); + + for (int i = 0; i < res.failedNodes.size(); i++) + failedNodes.add(res.failedNodes.get(i)); + } + + fut.onFinishResolve(failedNodes); + } + + /** + * @param msg Message. * @param evtData Event data. + * @throws Exception If failed. */ - private void processStartResolveCommunicationErrorMessage(ZkCommunicationErrorResolveStartMessage msg, + private void processCommunicationErrorResolveStartMessage(ZkCommunicationErrorResolveStartMessage msg, ZkDiscoveryCustomEventData evtData) throws Exception { ZkCommunicationErrorProcessFuture fut; @@ -2043,6 +2094,11 @@ public class ZookeeperDiscoveryImpl { } } + if (log.isInfoEnabled()) { + log.info("Received communication error resolve request [reqId=" + msg.id + + ", topVer=" + rtState.top.topologySnapshot() + ']'); + } + assert !fut.isDone() : fut; final String futPath = zkPaths.distributedFutureBasePath(msg.id); @@ -2050,7 +2106,7 @@ public class ZookeeperDiscoveryImpl { final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); if (rtState.crd) { - ZkCollectDistributedFuture nodeResFut = new ZkCollectDistributedFuture(this, rtState, futPath, + ZkDistributedCollectDataFuture nodeResFut = new ZkDistributedCollectDataFuture(this, rtState, futPath, new Callable<Void>() { @Override public Void call() throws Exception { // Future is completed from ZK event thread. @@ -2075,25 +2131,34 @@ public class ZookeeperDiscoveryImpl { * @param rtState Runtime state. * @throws Exception If failed. */ - void finishCommunicationResolveProcess(ZkRuntimeState rtState) throws Exception { + private void finishCommunicationResolveProcess(ZkRuntimeState rtState) throws Exception { ZkDiscoveryEventsData evtsData = rtState.evtsData; UUID futId = rtState.evtsData.communicationErrorResolveFutureId(); assert futId != null; - rtState.evtsData.communicationErrorResolveFutureId(null); - ZkCommunicationErrorResolveFinishMessage msg = new ZkCommunicationErrorResolveFinishMessage(futId); + ZkCommunicationErrorResolveResult res = new ZkCommunicationErrorResolveResult(null); + + msg.res = res; + + ZkDistributedCollectDataFuture.saveResult(zkPaths.distributedFutureResultPath(futId), + rtState.zkClient, + marshalZip(res)); + + evtsData.evtIdGen++; + ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData( evtsData.evtIdGen, evtsData.topVer, locNode.id(), + msg, null, false); - evtData.msg = msg; + evtData.resolvedMsg = msg; evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData); @@ -2104,11 +2169,15 @@ public class ZookeeperDiscoveryImpl { * */ public void simulateNodeFailure() { - zkClient().deleteIfExistsAsync(zkPaths.aliveNodesDir); + ZkRuntimeState rtState = this.rtState; + + ZookeeperClient client = rtState.zkClient; + + client.deleteIfExistsAsync(zkPaths.aliveNodesDir); rtState.onCloseStart(); - zkClient().close(); + rtState.zkClient.close(); } /** @@ -2211,16 +2280,11 @@ public class ZookeeperDiscoveryImpl { throw new ZookeeperClientFailedException("Received node failed event for local node."); } else { - PingFuture pingFut = pingFuts.get(failedNode.id()); + PingFuture pingFut = pingFuts.get(failedNode.order()); if (pingFut != null) pingFut.onDone(false); - ZkCommunicationErrorProcessFuture commErrFut = commErrProcFut.get(); - - if (commErrFut != null) - commErrFut.onNodeFailed(failedNode); - final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); lsnr.onDiscovery(EVT_NODE_FAILED, @@ -2291,9 +2355,10 @@ public class ZookeeperDiscoveryImpl { evtData.topologyVersion(), // Use topology version from original event. locNode.id(), null, + null, true); - ackEvtData.msg = ack; + ackEvtData.resolvedMsg = ack; if (newEvts == null) newEvts = new ArrayList<>(); @@ -2404,18 +2469,28 @@ public class ZookeeperDiscoveryImpl { * @param ctx Context for log. * @param evtData Event data. * @return Ack message. + * @throws Exception If failed. */ - @Nullable private DiscoverySpiCustomMessage handleProcessedCustomEvent(String ctx, ZkDiscoveryCustomEventData evtData) { + @Nullable private DiscoverySpiCustomMessage handleProcessedCustomEvent(String ctx, ZkDiscoveryCustomEventData evtData) + throws Exception { if (log.isDebugEnabled()) log.debug("All nodes processed custom event [ctx=" + ctx + ", evtData=" + evtData + ']'); if (!evtData.ackEvent()) { - deleteCustomEventDataAsync(rtState.zkClient, evtData.evtPath); + if (evtData.evtPath != null) + deleteCustomEventDataAsync(rtState.zkClient, evtData.evtPath); + else { + if (evtData.resolvedMsg instanceof ZkCommunicationErrorResolveFinishMessage) { + UUID futId = ((ZkCommunicationErrorResolveFinishMessage)evtData.resolvedMsg).futId; - assert evtData.msg != null || locNode.order() > evtData.topologyVersion() : evtData; + ZkDistributedCollectDataFuture.deleteFutureData(rtState.zkClient, zkPaths, futId); + } + } - if (evtData.msg != null) - return evtData.msg.ackMessage(); + assert evtData.resolvedMsg != null || locNode.order() > evtData.topologyVersion() : evtData; + + if (evtData.resolvedMsg != null) + return evtData.resolvedMsg.ackMessage(); } else { String path = zkPaths.ackEventDataPath(evtData.eventId()); @@ -2829,7 +2904,7 @@ public class ZookeeperDiscoveryImpl { /** * */ - private class PingFuture extends GridFutureAdapter<Boolean> implements IgniteSpiTimeoutObject, Runnable { + private class PingFuture extends GridFutureAdapter<Boolean> implements IgniteSpiTimeoutObject { /** */ private final ZookeeperClusterNode node; @@ -2839,16 +2914,20 @@ public class ZookeeperDiscoveryImpl { /** */ private final IgniteUuid id; + /** */ + private final ZkRuntimeState rtState; + /** * @param node Node. */ - PingFuture(ZookeeperClusterNode node) { + PingFuture(ZkRuntimeState rtState, ZookeeperClusterNode node) { + this.rtState = rtState; this.node = node; id = IgniteUuid.fromUuid(node.id()); endTime = System.currentTimeMillis() + node.sessionTimeout() + 1000; - } + }; /** {@inheritDoc} */ @Override public IgniteUuid id() { @@ -2861,33 +2940,34 @@ public class ZookeeperDiscoveryImpl { } /** {@inheritDoc} */ - @Override public void run() { + @Override public void onTimeout() { if (checkNodeAndState()) { - try { - for (String path : zkClient().getChildren(zkPaths.aliveNodesDir)) { - if (node.internalId() == ZkIgnitePaths.aliveInternalId(path)) { - onDone(true); + runInWorkerThread(new ZkRunnable(rtState, ZookeeperDiscoveryImpl.this) { + @Override protected void run0() throws Exception { + if (checkNodeAndState()) { + try { + for (String path : rtState.zkClient.getChildren(zkPaths.aliveNodesDir)) { + if (node.internalId() == ZkIgnitePaths.aliveInternalId(path)) { + onDone(true); + + return; + } + } + + onDone(false); + } + catch (Exception e) { + onDone(e); - return; + throw e; + } } } - - onDone(false); - } - catch (Exception e) { - if (checkNodeAndState()) - onDone(e); - } + }); } } /** {@inheritDoc} */ - @Override public void onTimeout() { - if (checkNodeAndState()) - runInWorkerThread(this); - } - - /** {@inheritDoc} */ @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) { if (super.onDone(res, err)) { pingFuts.remove(node.id(), this); http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index 3f6a8dc..ec70be6 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingCluster; import org.apache.curator.test.TestingZooKeeperServer; @@ -70,6 +71,8 @@ import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.logger.java.JavaLogger; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.CommunicationProblemContext; +import org.apache.ignite.spi.discovery.CommunicationProblemResolver; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; @@ -135,6 +138,9 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** */ private boolean persistence; + /** */ + private CommunicationProblemResolver communicationProblemResolver; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { if (testSockNio) @@ -170,8 +176,6 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { cfg.setCacheConfiguration(ccfg); - // cfg.setMarshaller(new JdkMarshaller()); - cfg.setClientMode(client); if (userAttrs != null) @@ -248,6 +252,10 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } } + /** + * @param instances Number of instances. + * @return Cluster. + */ private static TestingCluster createTestingCluster(int instances) { String tmpDir = System.getProperty("java.io.tmpdir"); @@ -270,6 +278,9 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { return new TestingCluster(specs); } + /** + * @param file Directory to delete. + */ private static void deleteRecursively0(File file) { File[] files = file.listFiles(); @@ -279,8 +290,10 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { for (File f : files) { if (f.isDirectory()) deleteRecursively0(f); - else - f.delete(); + else { + if (!f.delete()) + throw new IgniteException("Failed to delete file: " + f.getAbsolutePath()); + } } } @@ -331,6 +344,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { assertFalse("Unexpected error, see log for details", err); checkEventsConsistency(); + + checkInternalStructuresCleanup(); } finally { reset(); @@ -342,6 +357,23 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + private void checkInternalStructuresCleanup() throws Exception { + for (Ignite node : G.allGrids()) { + final AtomicReference<?> res = GridTestUtils.getFieldValue(spi(node), "impl", "commErrProcFut"); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return res.get() == null; + } + }, 5000); + + assertNull(res.get()); + } + } + + /** + * @throws Exception If failed. + */ public void testMetadataUpdate() throws Exception { startGrid(0); @@ -879,16 +911,31 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { waitForTopology(initNodes + startNodes - failCnt); } + /** + * @param node + * @return + */ private static String aliveZkNodePath(Ignite node) { return aliveZkNodePath(node.configuration().getDiscoverySpi()); } + /** + * @param spi + * @return + */ private static String aliveZkNodePath(DiscoverySpi spi) { String path = GridTestUtils.getFieldValue(spi, "impl", "rtState", "locNodeZkPath"); return path.substring(path.lastIndexOf('/') + 1); } + /** + * @param log + * @param connectString + * @param failedZkNodes + * @param timeout + * @throws Exception + */ private static void waitNoAliveZkNodes(final IgniteLogger log, String connectString, final List<String> failedZkNodes, long timeout) @@ -1724,6 +1771,99 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testNoOpCommunicationErrorResolve_1() throws Exception { + communicationErrorResolve_Simple(2); + } + + /** + * @throws Exception If failed. + */ + public void testNoOpCommunicationErrorResolve_2() throws Exception { + communicationErrorResolve_Simple(10); + } + + /** + * @param nodes Nodes number. + * @throws Exception If failed. + */ + private void communicationErrorResolve_Simple(int nodes) throws Exception { + assert nodes > 1; + + sesTimeout = 2000; + communicationProblemResolver = new NoOpCommunicationProblemResolver(); + + startGridsMultiThreaded(nodes); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < 3; i++) { + info("Iteration: " + i); + + int idx1 = rnd.nextInt(nodes); + + int idx2; + + do { + idx2 = rnd.nextInt(nodes); + } + while (idx1 == idx2); + + ZookeeperDiscoverySpi spi = spi(ignite(idx1)); + + spi.onCommunicationConnectionError(ignite(idx2).cluster().localNode(), new Exception("test")); + + checkInternalStructuresCleanup(); + } + } + + /** + * @throws Exception If failed. + */ + public void testNoOpCommunicationErrorResolve_3() throws Exception { + // One node fails before sending communication status. + sesTimeout = 2000; + communicationProblemResolver = new NoOpCommunicationProblemResolver(); + + startGridsMultiThreaded(3); + + sesTimeout = 10_000; + + testSockNio = true; + sesTimeout = 5000; + + startGrid(3); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() { + ZookeeperDiscoverySpi spi = spi(ignite(0)); + + spi.onCommunicationConnectionError(ignite(1).cluster().localNode(), new Exception("test")); + + return null; + } + }); + + U.sleep(1000); + + ZkTestClientCnxnSocketNIO nio = ZkTestClientCnxnSocketNIO.forNode(ignite(3)); + + nio.closeSocket(true); + + try { + stopGrid(3); + + fut.get(); + } + finally { + nio.allowConnect(); + } + + waitForTopology(3); + } + + /** * @param dfltConsistenId Default consistent ID flag. * @throws Exception If failed. */ @@ -2062,6 +2202,14 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** + * @param node Node. + * @return Node's discovery SPI. + */ + private static ZookeeperDiscoverySpi spi(Ignite node) { + return (ZookeeperDiscoverySpi)node.configuration().getDiscoverySpi(); + } + + /** * @param nodeName Node name. * @return Node's discovery SPI. * @throws Exception If failed. @@ -2069,7 +2217,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { private ZookeeperDiscoverySpi waitSpi(final String nodeName) throws Exception { GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { - return spis.contains(nodeName); + return spis.containsKey(nodeName); } }, 5000); @@ -2080,6 +2228,10 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { return spi; } + /** + * @param topVer Topology version. + * @return Expected event instance. + */ private static DiscoveryEvent joinEvent(long topVer) { DiscoveryEvent expEvt = new DiscoveryEvent(null, null, EventType.EVT_NODE_JOINED, null); @@ -2088,6 +2240,10 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { return expEvt; } + /** + * @param topVer Topology version. + * @return Expected event instance. + */ private static DiscoveryEvent failEvent(long topVer) { DiscoveryEvent expEvt = new DiscoveryEvent(null, null, EventType.EVT_NODE_FAILED, null); @@ -2370,4 +2526,14 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { private static class C2 implements Serializable { // No-op. } + + /** + * + */ + static class NoOpCommunicationProblemResolver implements CommunicationProblemResolver { + /** {@inheritDoc} */ + @Override public void resolve(CommunicationProblemContext ctx) { + // No-op. + } + } }
