Repository: ignite Updated Branches: refs/heads/ignite-zk 0cf778363 -> 953f07929
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/953f0792 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/953f0792 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/953f0792 Branch: refs/heads/ignite-zk Commit: 953f079299ed4b662285174058a4c3b9d8320c86 Parents: 0cf7783 Author: sboikov <[email protected]> Authored: Tue Dec 12 16:26:40 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Dec 12 16:37:38 2017 +0300 ---------------------------------------------------------------------- .../discovery/CommunicationProblemContext.java | 62 ++++ .../discovery/CommunicationProblemResolver.java | 28 ++ .../DefaultCommunicationProblemResolver.java | 172 +++++++++++ .../discovery/zk/internal/ZkRuntimeState.java | 46 +++ .../discovery/zk/internal/ZookeeperClient.java | 9 +- .../zk/internal/ZookeeperDiscoveryImpl.java | 288 +++++++++++-------- .../distributed/IgniteCacheManyClientsTest.java | 44 +-- 7 files changed, 516 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/953f0792/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemContext.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemContext.java new file mode 100644 index 0000000..71673f1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemContext.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery; + +import java.util.List; +import java.util.UUID; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.spi.communication.CommunicationSpi; + +/** + * + */ +public interface CommunicationProblemContext { + /** + * @return Current topology snapshot. + */ + public List<ClusterNode> topologySnapshot(); + + /** + * @param node1 First node. + * @param node2 Second node. + * @return {@code True} if {@link CommunicationSpi} is able to establish connection from first node to second node. + */ + public boolean connectionAvailable(ClusterNode node1, ClusterNode node2); + + /** + * @return List of currently started cache. + */ + public List<String> startedCaches(); + + /** + * @param cacheName Cache name. + * @return Cache partitions affinity assignment. + */ + public List<List<ClusterNode>> cacheAffinity(String cacheName); + + /** + * @param cacheName Cache name. + * @return Cache partitions owners. + */ + public List<List<ClusterNode>> cachePartitionOwners(String cacheName); + + /** + * @param node Node to kill. + */ + public void killNode(ClusterNode node); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/953f0792/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemResolver.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemResolver.java new file mode 100644 index 0000000..a9b620b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemResolver.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery; + +/** + * + */ +public interface CommunicationProblemResolver { + /** + * @param ctx Context. + */ + public void resolve(CommunicationProblemContext ctx); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/953f0792/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java new file mode 100644 index 0000000..4d0262d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery; + +import java.util.BitSet; +import java.util.List; +import org.apache.ignite.cluster.ClusterNode; + +/** + * + */ +public class DefaultCommunicationProblemResolver implements CommunicationProblemResolver { + /** {@inheritDoc} */ + @Override public void resolve(CommunicationProblemContext ctx) { + ClusterGraph graph = new ClusterGraph(ctx); + + BitSet cluster = graph.findLargestIndependentCluster(); + + List<ClusterNode> nodes = ctx.topologySnapshot(); + + if (graph.checkFullyConnected(cluster) && cluster.cardinality() < nodes.size()) { + for (int i = 0; i < nodes.size(); i++) { + if (!cluster.get(i)) + ctx.killNode(nodes.get(i)); + } + } + } + + /** + * + */ + private static class ClusterGraph { + /** */ + private final static int WORD_IDX_SHIFT = 6; + + /** + * @param bitIndex Bit index. + * @return Word index containing bit with given index. + */ + private static int wordIndex(int bitIndex) { + return bitIndex >> WORD_IDX_SHIFT; + } + + /** */ + private final int nodeCnt; + + /** */ + private final long[] visitBitSet; + + /** */ + private final CommunicationProblemContext ctx; + + /** */ + private final List<ClusterNode> nodes; + + ClusterGraph(CommunicationProblemContext ctx) { + this.ctx = ctx; + + nodes = ctx.topologySnapshot(); + + nodeCnt = nodes.size(); + + assert nodeCnt > 0; + + visitBitSet = initBitSet(nodeCnt); + } + + static long[] initBitSet(int bitCnt) { + return new long[wordIndex(bitCnt - 1) + 1]; + } + + BitSet findLargestIndependentCluster() { + BitSet maxCluster = null; + int maxClusterSize = 0; + + for (int i = 0; i < nodeCnt; i++) { + if (getBit(visitBitSet, i)) + continue; + + BitSet cluster = new BitSet(nodeCnt); + + search(cluster, i); + + int size = cluster.cardinality(); + + if (maxCluster == null || size > maxClusterSize) { + maxCluster = cluster; + maxClusterSize = size; + } + } + + return maxCluster; + } + + boolean checkFullyConnected(BitSet cluster) { + int startIdx = 0; + + int clusterNodes = cluster.cardinality(); + + for (;;) { + int idx = cluster.nextSetBit(startIdx); + + if (idx == -1) + break; + + ClusterNode node1 = nodes.get(idx); + + for (int i = 0; i < clusterNodes; i++) { + if (!cluster.get(i) || i == idx) + continue; + + ClusterNode node2 = nodes.get(i); + + if (cluster.get(i) && ctx.connectionAvailable(node1, node2)) + return false; + } + + startIdx = idx + 1; + } + + return true; + } + + void search(BitSet cluster, int idx) { + setBit(visitBitSet, idx); + + cluster.set(idx); + + ClusterNode node1 = nodes.get(idx); + + for (int i = 0; i < nodeCnt; i++) { + if (i == idx || getBit(visitBitSet, i)) + continue; + + ClusterNode node2 = nodes.get(i); + + boolean connected = ctx.connectionAvailable(node1, node2) || + ctx.connectionAvailable(node2, node1); + + if (connected) + search(cluster, i); + } + } + + static void setBit(long words[], int bitIndex) { + int wordIndex = wordIndex(bitIndex); + + words[wordIndex] |= (1L << bitIndex); + } + + static boolean getBit(long[] words, int bitIndex) { + int wordIndex = wordIndex(bitIndex); + + return (words[wordIndex] & (1L << bitIndex)) != 0; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/953f0792/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java index 4653109..fc03f8d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java @@ -18,12 +18,23 @@ package org.apache.ignite.spi.discovery.zk.internal; import org.apache.ignite.spi.IgniteSpiTimeoutObject; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.Watcher; /** * */ class ZkRuntimeState { /** */ + ZkWatcher watcher; + + /** */ + ZkAliveNodeDataWatcher aliveNodeDataWatcher; + + /** */ + volatile boolean closing; + + /** */ final boolean prevJoined; /** */ @@ -68,4 +79,39 @@ class ZkRuntimeState { ZkRuntimeState(boolean prevJoined) { this.prevJoined = prevJoined; } + + /** + * @param watcher Watcher. + * @param aliveNodeDataWatcher Alive nodes data watcher. + */ + void init(ZkWatcher watcher, ZkAliveNodeDataWatcher aliveNodeDataWatcher) { + this.watcher = watcher; + this.aliveNodeDataWatcher = aliveNodeDataWatcher; + } + + /** + * + */ + void onCloseStart() { + closing = true; + + ZookeeperClient zkClient = this.zkClient; + + if (zkClient != null) + zkClient.onCloseStart(); + } + + /** + * + */ + interface ZkWatcher extends Watcher, AsyncCallback.Children2Callback, AsyncCallback.DataCallback { + // No-op. + } + + /** + * + */ + interface ZkAliveNodeDataWatcher extends Watcher, AsyncCallback.DataCallback { + // No-op. + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/953f0792/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java index a0bc2f0..a83886a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java @@ -348,7 +348,8 @@ public class ZookeeperClient implements Watcher { return zk.create(path, data, ZK_ACL, createMode); } catch (KeeperException.NodeExistsException e) { - log.info("Node already exists: " + path); + if (log.isDebugEnabled()) + log.debug("Node already exists: " + path); return path; } @@ -391,7 +392,8 @@ public class ZookeeperClient implements Watcher { if (children.get(i).startsWith(checkPrefix)) { String resPath = parentPath + "/" + child; - log.info("Check before retry, node already created: " + resPath); + if (log.isDebugEnabled()) + log.debug("Check before retry, node already created: " + resPath); return resPath; } @@ -403,7 +405,8 @@ public class ZookeeperClient implements Watcher { catch (KeeperException.NodeExistsException e) { assert !createMode.isSequential() : createMode; - log.info("Node already exists: " + path); + if (log.isDebugEnabled()) + log.debug("Node already exists: " + path); return path; } http://git-wip-us.apache.org/repos/asf/ignite/blob/953f0792/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 9c1e398..effecbb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -127,18 +127,6 @@ public class ZookeeperDiscoveryImpl { private final GridFutureAdapter<Void> joinFut = new GridFutureAdapter<>(); /** */ - private final AliveNodeDataWatcher aliveNodeDataWatcher = new AliveNodeDataWatcher(); - - /** */ - private final ZkWatcher watcher; - - /** */ - private final ZKChildrenCallback childrenCallback; - - /** */ - private final ZkDataCallback dataCallback; - - /** */ private final int evtsAckThreshold; /** */ @@ -197,10 +185,6 @@ public class ZookeeperDiscoveryImpl { this.exchange = exchange; this.clientReconnectEnabled = locNode.isClient() && !spi.isClientReconnectDisabled(); - watcher = new ZkWatcher(); - childrenCallback = new ZKChildrenCallback(); - dataCallback = new ZkDataCallback(); - int evtsAckThreshold = IgniteSystemProperties.getInteger(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, 5); if (evtsAckThreshold <= 0) @@ -342,7 +326,7 @@ public class ZookeeperDiscoveryImpl { return; } - rtState.zkClient.onCloseStart(); + rtState.onCloseStart(); busyLock.block(); @@ -764,6 +748,8 @@ public class ZookeeperDiscoveryImpl { final ZkRuntimeState rtState = this.rtState; + rtState.init(new ZkWatcher(rtState), new AliveNodeDataWatcher(rtState)); + ZookeeperClient zkClient = rtState.zkClient; final int OVERHEAD = 5; @@ -818,9 +804,9 @@ public class ZookeeperDiscoveryImpl { spi.getSpiContext().addTimeoutObject(rtState.joinTimeoutObj); - zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback()); + zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback(rtState)); - zkClient.getDataAsync(zkPaths.evtsPath, watcher, dataCallback); + zkClient.getDataAsync(zkPaths.evtsPath, rtState.watcher, rtState.watcher); } catch (IgniteCheckedException | ZookeeperClientFailedException e) { throw new IgniteSpiException("Failed to initialize Zookeeper nodes", e); @@ -968,7 +954,7 @@ public class ZookeeperDiscoveryImpl { "locId=" + locNode.id() + ", prevPath=" + prevE.getValue() + ']'); - PreviousNodeWatcher watcher = new PreviousNodeWatcher(); + PreviousNodeWatcher watcher = new PreviousNodeWatcher(rtState); rtState.zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + prevE.getValue(), watcher, watcher); } @@ -988,7 +974,7 @@ public class ZookeeperDiscoveryImpl { if (log.isInfoEnabled()) log.info("Previous node failed, check is node new coordinator [locId=" + locNode.id() + ']'); - rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback()); + rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback(rtState)); } /** @@ -1023,8 +1009,8 @@ public class ZookeeperDiscoveryImpl { newClusterStarted(locInternalId); } - rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, watcher, childrenCallback); - rtState.zkClient.getChildrenAsync(zkPaths.customEvtsDir, watcher, childrenCallback); + rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, rtState.watcher, rtState.watcher); + rtState.zkClient.getChildrenAsync(zkPaths.customEvtsDir, rtState.watcher, rtState.watcher); for (String alivePath : aliveNodes) watchAliveNodeData(alivePath); @@ -1039,7 +1025,7 @@ public class ZookeeperDiscoveryImpl { String path = zkPaths.aliveNodesDir + "/" + alivePath; if (!path.equals(rtState.locNodeZkPath)) - rtState.zkClient.getDataAsync(path, aliveNodeDataWatcher, aliveNodeDataWatcher); + rtState.zkClient.getDataAsync(path, rtState.aliveNodeDataWatcher, rtState.aliveNodeDataWatcher); } /** @@ -1940,7 +1926,7 @@ public class ZookeeperDiscoveryImpl { public void simulateNodeFailure() { zkClient().deleteIfExistsAsync(zkPaths.aliveNodesDir); - zkClient().onCloseStart(); + rtState.onCloseStart(); zkClient().close(); } @@ -2014,7 +2000,7 @@ public class ZookeeperDiscoveryImpl { if (failedNode.isLocal()) { U.warn(log, "Received EVT_NODE_FAILED for local node."); - zkClient().onCloseStart(); + rtState.onCloseStart(); if (locNode.isClient() && clientReconnectEnabled) { boolean reconnect = false; @@ -2315,8 +2301,7 @@ public class ZookeeperDiscoveryImpl { ZookeeperClient zkClient = rtState.zkClient; - if (zkClient != null) - zkClient.onCloseStart(); + rtState.onCloseStart(); busyLock.block(); @@ -2410,6 +2395,8 @@ public class ZookeeperDiscoveryImpl { /** {@inheritDoc} */ @Override public void run() { + rtState.closing = true; + busyLock.block(); busyLock.unblock(); @@ -2455,45 +2442,136 @@ public class ZookeeperDiscoveryImpl { /** * */ - private class ZkWatcher implements Watcher { + abstract class ZkCallabck { + /** */ + final ZkRuntimeState rtState; + + /** + * @param rtState Runtime state. + */ + ZkCallabck(ZkRuntimeState rtState) { + this.rtState = rtState; + } + + /** + * @return {@code True} if is able to start processing. + */ + final boolean onProcessStart() { + return !rtState.closing && busyLock.enterBusy(); + } + + /** + * + */ + final void onProcessEnd() { + busyLock.leaveBusy(); + } + + /** + * @param e Error. + */ + final void onProcessError(Throwable e) { + onFatalError(busyLock, e); + } + } + + /** + * + */ + abstract class AbstractWatcher extends ZkCallabck implements Watcher { + /** + * @param rtState Runtime state. + */ + AbstractWatcher(ZkRuntimeState rtState) { + super(rtState); + } + /** {@inheritDoc} */ - @Override public void process(WatchedEvent evt) { - if (!busyLock.enterBusy()) + @Override public final void process(WatchedEvent evt) { + if (!onProcessStart()) return; try { - if (evt.getType() == Event.EventType.NodeDataChanged) { - if (evt.getPath().equals(zkPaths.evtsPath)) { - if (!rtState.crd) - rtState.zkClient.getDataAsync(evt.getPath(), this, dataCallback); - } - else - U.warn(log, "Received NodeDataChanged for unexpected path: " + evt.getPath()); - } - else if (evt.getType() == Event.EventType.NodeChildrenChanged) { - if (evt.getPath().equals(zkPaths.aliveNodesDir)) - rtState.zkClient.getChildrenAsync(evt.getPath(), this, childrenCallback); - else if (evt.getPath().equals(zkPaths.customEvtsDir)) - rtState.zkClient.getChildrenAsync(evt.getPath(), this, childrenCallback); - else - U.warn(log, "Received NodeChildrenChanged for unexpected path: " + evt.getPath()); - } + process0(evt); - busyLock.leaveBusy(); + onProcessEnd(); } catch (Throwable e) { - onFatalError(busyLock, e); + onProcessError(e); } } + + /** + * @param evt Event. + * @throws Exception If failed. + */ + protected abstract void process0(WatchedEvent evt) throws Exception; } /** * */ - private class ZKChildrenCallback implements AsyncCallback.Children2Callback { + abstract class AbstractChildrenCallback extends ZkCallabck implements AsyncCallback.Children2Callback { + /** + * @param rtState Runtime state. + */ + AbstractChildrenCallback(ZkRuntimeState rtState) { + super(rtState); + } + /** {@inheritDoc} */ @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - if (!busyLock.enterBusy()) + if (!onProcessStart()) + return; + + try { + processResult0(rc, path, ctx, children, stat); + + onProcessEnd(); + } + catch (Throwable e) { + onProcessError(e); + } + } + + abstract void processResult0(int rc, String path, Object ctx, List<String> children, Stat stat) + throws Exception; + } + + /** + * + */ + private class ZkWatcher extends AbstractWatcher implements ZkRuntimeState.ZkWatcher { + /** + * @param rtState Runtime state. + */ + ZkWatcher(ZkRuntimeState rtState) { + super(rtState); + } + + /** {@inheritDoc} */ + @Override public void process0(WatchedEvent evt) { + if (evt.getType() == Event.EventType.NodeDataChanged) { + if (evt.getPath().equals(zkPaths.evtsPath)) { + if (!rtState.crd) + rtState.zkClient.getDataAsync(evt.getPath(), this, this); + } + else + U.warn(log, "Received NodeDataChanged for unexpected path: " + evt.getPath()); + } + else if (evt.getType() == Event.EventType.NodeChildrenChanged) { + if (evt.getPath().equals(zkPaths.aliveNodesDir)) + rtState.zkClient.getChildrenAsync(evt.getPath(), this, this); + else if (evt.getPath().equals(zkPaths.customEvtsDir)) + rtState.zkClient.getChildrenAsync(evt.getPath(), this, this); + else + U.warn(log, "Received NodeChildrenChanged for unexpected path: " + evt.getPath()); + } + } + + /** {@inheritDoc} */ + @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { + if (!onProcessStart()) return; try { @@ -2506,21 +2584,16 @@ public class ZookeeperDiscoveryImpl { else U.warn(log, "Children callback for unexpected path: " + path); - busyLock.leaveBusy(); + onProcessEnd(); } catch (Throwable e) { - onFatalError(busyLock, e); + onProcessError(e); } } - } - /** - * - */ - private class ZkDataCallback implements AsyncCallback.DataCallback { /** {@inheritDoc} */ @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - if (!busyLock.enterBusy()) + if (!onProcessStart()) return; try { @@ -2533,10 +2606,10 @@ public class ZookeeperDiscoveryImpl { else U.warn(log, "Data callback for unknown path: " + path); - busyLock.leaveBusy(); + onProcessEnd(); } catch (Throwable e) { - onFatalError(busyLock, e); + onProcessError(e); } } } @@ -2544,26 +2617,23 @@ public class ZookeeperDiscoveryImpl { /** * */ - private class AliveNodeDataWatcher implements Watcher, AsyncCallback.DataCallback { - /** {@inheritDoc} */ - @Override public void process(WatchedEvent evt) { - if (!busyLock.enterBusy()) - return; - - try { - if (evt.getType() == Event.EventType.NodeDataChanged) - rtState.zkClient.getDataAsync(evt.getPath(), this, this); + private class AliveNodeDataWatcher extends AbstractWatcher implements ZkRuntimeState.ZkAliveNodeDataWatcher { + /** + * @param rtState Runtime state. + */ + AliveNodeDataWatcher(ZkRuntimeState rtState) { + super(rtState); + } - busyLock.leaveBusy(); - } - catch (Throwable e) { - onFatalError(busyLock, e); - } + /** {@inheritDoc} */ + @Override public void process0(WatchedEvent evt) { + if (evt.getType() == Event.EventType.NodeDataChanged) + rtState.zkClient.getDataAsync(evt.getPath(), this, this); } /** {@inheritDoc} */ @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - if (!busyLock.enterBusy()) + if (!onProcessStart()) return; try { @@ -2571,10 +2641,10 @@ public class ZookeeperDiscoveryImpl { processResult0(rc, path, data); - busyLock.leaveBusy(); + onProcessEnd(); } catch (Throwable e) { - onFatalError(busyLock, e); + onProcessError(e); } } @@ -2619,30 +2689,27 @@ public class ZookeeperDiscoveryImpl { /** * */ - private class PreviousNodeWatcher implements Watcher, AsyncCallback.StatCallback { - /** {@inheritDoc} */ - @Override public void process(WatchedEvent evt) { - if (!busyLock.enterBusy()) - return; - - try { - if (evt.getType() == Event.EventType.NodeDeleted) - onPreviousNodeFail(); - else { - if (evt.getType() != Event.EventType.None) - rtState.zkClient.existsAsync(evt.getPath(), this, this); - } + private class PreviousNodeWatcher extends AbstractWatcher implements AsyncCallback.StatCallback { + /** + * @param rtState Runtime state. + */ + PreviousNodeWatcher(ZkRuntimeState rtState) { + super(rtState); + } - busyLock.leaveBusy(); - } - catch (Throwable e) { - onFatalError(busyLock, e); + /** {@inheritDoc} */ + @Override public void process0(WatchedEvent evt) { + if (evt.getType() == Event.EventType.NodeDeleted) + onPreviousNodeFail(); + else { + if (evt.getType() != Event.EventType.None) + rtState.zkClient.existsAsync(evt.getPath(), this, this); } } /** {@inheritDoc} */ @Override public void processResult(int rc, String path, Object ctx, Stat stat) { - if (!busyLock.enterBusy()) + if (!onProcessStart()) return; try { @@ -2651,10 +2718,10 @@ public class ZookeeperDiscoveryImpl { if (rc == KeeperException.Code.NONODE.intValue() || stat == null) onPreviousNodeFail(); - busyLock.leaveBusy(); + onProcessEnd(); } catch (Throwable e) { - onFatalError(busyLock, e); + onProcessError(e); } } } @@ -2662,22 +2729,19 @@ public class ZookeeperDiscoveryImpl { /** * */ - class CheckCoordinatorCallback implements AsyncCallback.Children2Callback { - /** {@inheritDoc} */ - @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - if (!busyLock.enterBusy()) - return; - - try { - assert rc == 0 : KeeperException.Code.get(rc); + class CheckCoordinatorCallback extends AbstractChildrenCallback { + /** + * @param rtState Runtime state. + */ + CheckCoordinatorCallback(ZkRuntimeState rtState) { + super(rtState); + } - checkIsCoordinator(rc, children); + /** {@inheritDoc} */ + @Override public void processResult0(int rc, String path, Object ctx, List<String> children, Stat stat) throws Exception { + assert rc == 0 : KeeperException.Code.get(rc); - busyLock.leaveBusy(); - } - catch (Throwable e) { - onFatalError(busyLock, e); - } + checkIsCoordinator(rc, children); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/953f0792/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java index a0be40e..7785a3c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java @@ -178,7 +178,7 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { log.info("All clients started."); try { - checkNodes(SRVS + CLIENTS); + checkNodes0(SRVS + CLIENTS); } finally { for (Ignite client : clients) @@ -188,6 +188,30 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { /** * @param expCnt Expected number of nodes. + * @throws Exception If failed. + */ + private void checkNodes0(final int expCnt) throws Exception { + boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + checkNodes(expCnt); + + return true; + } + catch (AssertionFailedError e) { + log.info("Check failed, will retry: " + e); + } + + return false; + } + }, 10_000); + + if (!wait) + checkNodes(expCnt); + } + + /** + * @param expCnt Expected number of nodes. */ private void checkNodes(int expCnt) { assertEquals(expCnt, G.allGrids().size()); @@ -297,23 +321,7 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { if (err0 != null) throw err0; - boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - try { - checkNodes(SRVS + THREADS); - - return true; - } - catch (AssertionFailedError e) { - log.info("Check failed, will retry: " + e); - } - - return false; - } - }, 10_000); - - if (!wait) - checkNodes(SRVS + THREADS); + checkNodes0(SRVS + THREADS); log.info("Stop clients.");
