zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4525b921 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4525b921 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4525b921 Branch: refs/heads/ignite-zk-ce Commit: 4525b9218b17786b9ddf6a3932fc3422b8cfc735 Parents: 246b765 Author: sboikov <[email protected]> Authored: Tue Dec 19 13:34:22 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Dec 19 14:34:25 2017 +0300 ---------------------------------------------------------------------- .../DefaultCommunicationProblemResolver.java | 199 +++++++++++++++++++ .../configuration/IgniteConfiguration.java | 12 +- .../apache/ignite/internal/IgniteKernal.java | 3 + .../org/apache/ignite/internal/IgnitionEx.java | 3 + .../internal/managers/GridManagerAdapter.java | 8 + .../discovery/GridDiscoveryManager.java | 68 +++++++ .../managers/discovery/IgniteDiscoverySpi.java | 4 +- .../org/apache/ignite/spi/IgniteSpiAdapter.java | 10 + .../org/apache/ignite/spi/IgniteSpiContext.java | 4 + .../communication/tcp/TcpCommunicationSpi.java | 18 +- .../DefaultCommunicationProblemResolver.java | 174 ---------------- .../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 4 +- .../ZkCommunicationErrorProcessFuture.java | 2 +- .../zk/internal/ZookeeperDiscoveryImpl.java | 33 +-- .../ZookeeperDiscoverySpiBasicTest.java | 102 ++++++++-- .../testframework/GridSpiTestContext.java | 10 + 17 files changed, 433 insertions(+), 223 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java new file mode 100644 index 0000000..1e973d3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.configuration; + +import java.util.BitSet; +import java.util.List; +import org.apache.ignite.cluster.ClusterNode; + +/** + * + */ +public class DefaultCommunicationProblemResolver implements CommunicationProblemResolver { + /** {@inheritDoc} */ + @Override public void resolve(CommunicationProblemContext ctx) { + ClusterGraph graph = new ClusterGraph(ctx); + + BitSet cluster = graph.findLargestIndependentCluster(); + + List<ClusterNode> nodes = ctx.topologySnapshot(); + + if (graph.checkFullyConnected(cluster) && cluster.cardinality() < nodes.size()) { + for (int i = 0; i < nodes.size(); i++) { + if (!cluster.get(i)) + ctx.killNode(nodes.get(i)); + } + } + } + + /** + * + */ + private static class ClusterGraph { + /** */ + private final static int WORD_IDX_SHIFT = 6; + + /** + * @param bitIndex Bit index. + * @return Word index containing bit with given index. + */ + private static int wordIndex(int bitIndex) { + return bitIndex >> WORD_IDX_SHIFT; + } + + /** */ + private final int nodeCnt; + + /** */ + private final long[] visitBitSet; + + /** */ + private final CommunicationProblemContext ctx; + + /** */ + private final List<ClusterNode> nodes; + + /** + * @param ctx Context. + */ + ClusterGraph(CommunicationProblemContext ctx) { + this.ctx = ctx; + + nodes = ctx.topologySnapshot(); + + nodeCnt = nodes.size(); + + assert nodeCnt > 0; + + visitBitSet = initBitSet(nodeCnt); + } + + /** + * @param bitCnt Number of bits. + * @return Bit set words. + */ + static long[] initBitSet(int bitCnt) { + return new long[wordIndex(bitCnt - 1) + 1]; + } + + /** + * @return Cluster nodes bit set. + */ + BitSet findLargestIndependentCluster() { + BitSet maxCluster = null; + int maxClusterSize = 0; + + for (int i = 0; i < nodeCnt; i++) { + if (getBit(visitBitSet, i)) + continue; + + BitSet cluster = new BitSet(nodeCnt); + + search(cluster, i); + + int size = cluster.cardinality(); + + if (maxCluster == null || size > maxClusterSize) { + maxCluster = cluster; + maxClusterSize = size; + } + } + + return maxCluster; + } + + /** + * @param cluster Cluster nodes bit set. + * @return {@code True} if all cluster nodes are able to connect to each other. + */ + boolean checkFullyConnected(BitSet cluster) { + int startIdx = 0; + + int clusterNodes = cluster.cardinality(); + + for (;;) { + int idx = cluster.nextSetBit(startIdx); + + if (idx == -1) + break; + + ClusterNode node1 = nodes.get(idx); + + for (int i = 0; i < clusterNodes; i++) { + if (!cluster.get(i) || i == idx) + continue; + + ClusterNode node2 = nodes.get(i); + + if (cluster.get(i) && !ctx.connectionAvailable(node1, node2)) + return false; + } + + startIdx = idx + 1; + } + + return true; + } + + /** + * @param cluster Current cluster bit set. + * @param idx Node index. + */ + void search(BitSet cluster, int idx) { + setBit(visitBitSet, idx); + + cluster.set(idx); + + ClusterNode node1 = nodes.get(idx); + + for (int i = 0; i < nodeCnt; i++) { + if (i == idx || getBit(visitBitSet, i)) + continue; + + ClusterNode node2 = nodes.get(i); + + boolean connected = ctx.connectionAvailable(node1, node2) || + ctx.connectionAvailable(node2, node1); + + if (connected) + search(cluster, i); + } + } + + /** + * @param words Bit set words. + * @param bitIndex Bit index. + */ + static void setBit(long words[], int bitIndex) { + int wordIndex = wordIndex(bitIndex); + + words[wordIndex] |= (1L << bitIndex); + } + + /** + * @param words Bit set words. + * @param bitIndex Bit index. + * @return Bit value. + */ + static boolean getBit(long[] words, int bitIndex) { + int wordIndex = wordIndex(bitIndex); + + return (words[wordIndex] & (1L << bitIndex)) != 0; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 8c3c818..dbf5fbb 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -595,12 +595,22 @@ public class IgniteConfiguration { warmupClos = cfg.getWarmupClosure(); } + /** + * TODO ZK + * @return + */ public CommunicationProblemResolver getCommunicationProblemResolver() { return commProblemRslvr; } - public void setCommunicationProblemResolver(CommunicationProblemResolver commProblemRslvr) { + /** + * TODO ZK + * @param commProblemRslvr + */ + public IgniteConfiguration setCommunicationProblemResolver(CommunicationProblemResolver commProblemRslvr) { this.commProblemRslvr = commProblemRslvr; + + return this; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index a641e6e..85bb7c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -2696,6 +2696,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { objs.add(cfg.getGridLogger()); objs.add(cfg.getMBeanServer()); + if (cfg.getCommunicationProblemResolver() != null) + objs.add(cfg.getCommunicationProblemResolver()); + return objs; } http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 62ebcd8..8ae6313 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -71,6 +71,7 @@ import org.apache.ignite.configuration.PersistentStoreConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; import org.apache.ignite.internal.processors.igfs.IgfsThreadFactory; import org.apache.ignite.internal.processors.igfs.IgfsUtils; @@ -2270,6 +2271,8 @@ public class IgnitionEx { initializeDefaultSpi(myCfg); + GridDiscoveryManager.initCommunicationErrorResolveConfiguration(myCfg); + initializeDefaultCacheConfiguration(myCfg); ExecutorConfiguration[] execCfgs = myCfg.getExecutorConfiguration(); http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index a151eb5..df84212 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -604,6 +604,14 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan return ctx.nodeAttributes(); } + @Override public boolean communicationErrorResolveSupported() { + return ctx.discovery().communicationErrorResolveSupported(); + } + + @Override public void resolveCommunicationError(ClusterNode node, Exception err) { + ctx.discovery().resolveCommunicationError(node, err); + } + /** * @param e Exception to handle. * @return GridSpiException Converted exception. http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 97441d7..172615c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -53,8 +53,11 @@ import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.CommunicationProblemResolver; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.DefaultCommunicationProblemResolver; +import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.ClusterMetricsSnapshot; @@ -108,6 +111,8 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.security.SecurityCredentials; import org.apache.ignite.plugin.segmentation.SegmentationPolicy; import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.CommunicationSpi; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData; import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; @@ -545,6 +550,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { }); } + if (ctx.config().getCommunicationProblemResolver() != null) + ctx.resource().injectGeneric(ctx.config().getCommunicationProblemResolver()); + spi.setListener(new DiscoverySpiListener() { private long gridStartTime; @@ -2336,6 +2344,66 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ).start(); } + /** + * @param cfg Configuration. + * @throws IgniteCheckedException If configuration is not valid. + */ + public static void initCommunicationErrorResolveConfiguration(IgniteConfiguration cfg) throws IgniteCheckedException { + CommunicationProblemResolver rslvr = cfg.getCommunicationProblemResolver(); + CommunicationSpi commSpi = cfg.getCommunicationSpi(); + DiscoverySpi discoverySpi = cfg.getDiscoverySpi(); + + if (rslvr != null) { + if (!supportsCommunicationErrorResolve(commSpi)) + throw new IgniteCheckedException("CommunicationProblemResolver is configured, but communication SPI does not support communication" + + "problem resolve: " + commSpi.getClass().getName()); + + if (!supportsCommunicationErrorResolve(discoverySpi)) + throw new IgniteCheckedException("CommunicationProblemResolver is configured, but discovery SPI does not support communication" + + "problem resolve: " + discoverySpi.getClass().getName()); + } + else { + if (supportsCommunicationErrorResolve(commSpi) && supportsCommunicationErrorResolve(discoverySpi)) + cfg.setCommunicationProblemResolver(new DefaultCommunicationProblemResolver()); + } + } + + /** + * @param spi Discovery SPI. + * @return {@code True} if SPI supports communication error resolve. + */ + private static boolean supportsCommunicationErrorResolve(DiscoverySpi spi) { + return spi instanceof IgniteDiscoverySpi && ((IgniteDiscoverySpi)spi).supportsCommunicationErrorResolve(); + } + + /** + * @param spi Discovery SPI. + * @return {@code True} if SPI supports communication error resolve. + */ + private static boolean supportsCommunicationErrorResolve(CommunicationSpi spi) { + return spi instanceof TcpCommunicationSpi; + } + + /** + * @return {@code True} if communication error resolve is supported. + */ + public boolean communicationErrorResolveSupported() { + return ctx.config().getCommunicationProblemResolver() != null; + } + + /** + * @param node Problem node. + * @param err Error. + */ + public void resolveCommunicationError(ClusterNode node, Exception err) { + DiscoverySpi spi = getSpi(); + + if (!supportsCommunicationErrorResolve(spi) || !supportsCommunicationErrorResolve(ctx.config().getCommunicationSpi())) + throw new UnsupportedOperationException(); + + ((IgniteDiscoverySpi)spi).resolveCommunicationError(node, err); + } + /** Worker for network segment checks. */ private class SegmentCheckWorker extends GridWorker { /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java index 9a1faa2..bf117f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java @@ -22,7 +22,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.spi.discovery.DiscoverySpi; /** - * TODO ZK + * */ public interface IgniteDiscoverySpi extends DiscoverySpi { /** @@ -63,5 +63,5 @@ public interface IgniteDiscoverySpi extends DiscoverySpi { * @param node Problem node. * @param err Connection error. */ - public void onCommunicationConnectionError(ClusterNode node, Exception err); + public void resolveCommunicationError(ClusterNode node, Exception err); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index 07ba214..50cf9fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -957,5 +957,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi { @Override public Map<String, Object> nodeAttributes() { return Collections.emptyMap(); } + + /** {@inheritDoc} */ + @Override public boolean communicationErrorResolveSupported() { + return false; + } + + /** {@inheritDoc} */ + @Override public void resolveCommunicationError(ClusterNode node, Exception err) { + throw new UnsupportedOperationException(); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java index 96b3e61..f9c6ffd 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java @@ -358,4 +358,8 @@ public interface IgniteSpiContext { * @return Current node attributes. */ public Map<String, Object> nodeAttributes(); + + public boolean communicationErrorResolveSupported(); + + public void resolveCommunicationError(ClusterNode node, Exception err); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 957a0e1..37be29f 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -3409,22 +3409,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati boolean commErrResolve = false; - if (connectionError(errs)) { - DiscoverySpi discoverySpi = ignite.configuration().getDiscoverySpi(); + IgniteSpiContext ctx = getSpiContext(); - if (discoverySpi instanceof IgniteDiscoverySpi) { - IgniteDiscoverySpi discoverySpi0 = (IgniteDiscoverySpi)discoverySpi; + if (connectionError(errs) && ctx.communicationErrorResolveSupported()) { + commErrResolve = true; - if (discoverySpi0.supportsCommunicationErrorResolve()) { - commErrResolve = true; - - discoverySpi0.onCommunicationConnectionError(node, errs); - } - } + ctx.resolveCommunicationError(node, errs); } if (!commErrResolve && enableForcibleNodeKill) { - if (getSpiContext().node(node.id()) != null + if (ctx.node(node.id()) != null && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) && connectionError(errs)) { String msg = "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " + @@ -3435,7 +3429,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati else U.warn(log, msg); - getSpiContext().failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" + + ctx.failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" + "rmtNode=" + node + ", errs=" + errs + ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java deleted file mode 100644 index b2d4bf0..0000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery; - -import java.util.BitSet; -import java.util.List; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.CommunicationProblemContext; -import org.apache.ignite.configuration.CommunicationProblemResolver; - -/** - * - */ -public class DefaultCommunicationProblemResolver implements CommunicationProblemResolver { - /** {@inheritDoc} */ - @Override public void resolve(CommunicationProblemContext ctx) { - ClusterGraph graph = new ClusterGraph(ctx); - - BitSet cluster = graph.findLargestIndependentCluster(); - - List<ClusterNode> nodes = ctx.topologySnapshot(); - - if (graph.checkFullyConnected(cluster) && cluster.cardinality() < nodes.size()) { - for (int i = 0; i < nodes.size(); i++) { - if (!cluster.get(i)) - ctx.killNode(nodes.get(i)); - } - } - } - - /** - * - */ - private static class ClusterGraph { - /** */ - private final static int WORD_IDX_SHIFT = 6; - - /** - * @param bitIndex Bit index. - * @return Word index containing bit with given index. - */ - private static int wordIndex(int bitIndex) { - return bitIndex >> WORD_IDX_SHIFT; - } - - /** */ - private final int nodeCnt; - - /** */ - private final long[] visitBitSet; - - /** */ - private final CommunicationProblemContext ctx; - - /** */ - private final List<ClusterNode> nodes; - - ClusterGraph(CommunicationProblemContext ctx) { - this.ctx = ctx; - - nodes = ctx.topologySnapshot(); - - nodeCnt = nodes.size(); - - assert nodeCnt > 0; - - visitBitSet = initBitSet(nodeCnt); - } - - static long[] initBitSet(int bitCnt) { - return new long[wordIndex(bitCnt - 1) + 1]; - } - - BitSet findLargestIndependentCluster() { - BitSet maxCluster = null; - int maxClusterSize = 0; - - for (int i = 0; i < nodeCnt; i++) { - if (getBit(visitBitSet, i)) - continue; - - BitSet cluster = new BitSet(nodeCnt); - - search(cluster, i); - - int size = cluster.cardinality(); - - if (maxCluster == null || size > maxClusterSize) { - maxCluster = cluster; - maxClusterSize = size; - } - } - - return maxCluster; - } - - boolean checkFullyConnected(BitSet cluster) { - int startIdx = 0; - - int clusterNodes = cluster.cardinality(); - - for (;;) { - int idx = cluster.nextSetBit(startIdx); - - if (idx == -1) - break; - - ClusterNode node1 = nodes.get(idx); - - for (int i = 0; i < clusterNodes; i++) { - if (!cluster.get(i) || i == idx) - continue; - - ClusterNode node2 = nodes.get(i); - - if (cluster.get(i) && ctx.connectionAvailable(node1, node2)) - return false; - } - - startIdx = idx + 1; - } - - return true; - } - - void search(BitSet cluster, int idx) { - setBit(visitBitSet, idx); - - cluster.set(idx); - - ClusterNode node1 = nodes.get(idx); - - for (int i = 0; i < nodeCnt; i++) { - if (i == idx || getBit(visitBitSet, i)) - continue; - - ClusterNode node2 = nodes.get(i); - - boolean connected = ctx.connectionAvailable(node1, node2) || - ctx.connectionAvailable(node2, node1); - - if (connected) - search(cluster, i); - } - } - - static void setBit(long words[], int bitIndex) { - int wordIndex = wordIndex(bitIndex); - - words[wordIndex] |= (1L << bitIndex); - } - - static boolean getBit(long[] words, int bitIndex) { - int wordIndex = wordIndex(bitIndex); - - return (words[wordIndex] & (1L << bitIndex)) != 0; - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 0e2f851..ad8eca0 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -2111,7 +2111,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery } /** {@inheritDoc} */ - @Override public void onCommunicationConnectionError(ClusterNode node, Exception err) { + @Override public void resolveCommunicationError(ClusterNode node, Exception err) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index 98a22d7..14bb107 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -195,8 +195,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery } /** {@inheritDoc} */ - @Override public void onCommunicationConnectionError(ClusterNode node, Exception err) { - impl.onCommunicationConnectionError(node, err); + @Override public void resolveCommunicationError(ClusterNode node, Exception err) { + impl.resolveCommunicationError(node, err); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java index 0074817..e64c801 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java @@ -148,7 +148,7 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen * @param futPath Future path. * @param nodes Nodes to ping. */ - void pingNodesAndNotifyFuture(final ZkRuntimeState rtState, final String futPath, List<ClusterNode> nodes) throws Exception { + void checkConnection(final ZkRuntimeState rtState, final String futPath, List<ClusterNode> nodes) { final TcpCommunicationSpi spi = (TcpCommunicationSpi)impl.spi.ignite().configuration().getCommunicationSpi(); IgniteFuture<BitSet> fut = spi.checkConnection(nodes); http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index ef3504f..f1ad869 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; import org.apache.ignite.internal.util.GridIntList; @@ -247,17 +248,17 @@ public class ZookeeperDiscoveryImpl { * @param node0 Problem node ID * @param err Connect error. */ - public void onCommunicationConnectionError(ClusterNode node0, Exception err) { - checkState(); - + public void resolveCommunicationError(ClusterNode node0, Exception err) { ZookeeperClusterNode node = node(node0.id()); if (node == null) - return; + throw new IgniteSpiException(new ClusterTopologyCheckedException("Node failed: " + node0.id())); IgniteInternalFuture<Boolean> nodeStatusFut; for (;;) { + checkState(); + ZkCommunicationErrorProcessFuture fut = commErrProcFut.get(); if (fut == null || fut.isDone()) { @@ -273,16 +274,16 @@ public class ZookeeperDiscoveryImpl { ", err= " + err + ']'); } - ConnectionState connState; + try { + checkState(); + } + catch (Exception e) { + fut.onError(e); - synchronized (this) { - connState = this.connState; + throw e; } - if (connState != ConnectionState.STARTED) - fut.onError(new IgniteCheckedException("Node stopped.")); - else - fut.scheduleCheckOnTimeout(); + fut.scheduleCheckOnTimeout(); } else fut = commErrProcFut.get(); @@ -303,7 +304,8 @@ public class ZookeeperDiscoveryImpl { } try { - nodeStatusFut.get(); + if (!nodeStatusFut.get()) + throw new IgniteSpiException(new ClusterTopologyCheckedException("Node failed: " + node0.id())); } catch (IgniteCheckedException e) { throw new IgniteSpiException(e); @@ -2240,7 +2242,7 @@ public class ZookeeperDiscoveryImpl { runInWorkerThread(new ZkRunnable(rtState, this) { @Override protected void run0() throws Exception { - fut0.pingNodesAndNotifyFuture(rtState, futPath, rtState.commErrProcNodes); + fut0.checkConnection(rtState, futPath, rtState.commErrProcNodes); } }); } @@ -2341,7 +2343,8 @@ public class ZookeeperDiscoveryImpl { ", rslvr=" + rslvr.getClass().getSimpleName() + ']'); } - ZkCommunicationProblemContext ctx = new ZkCommunicationProblemContext(topSnapshot, + ZkCommunicationProblemContext ctx = new ZkCommunicationProblemContext( + topSnapshot, initialNodes, nodesRes); @@ -2352,7 +2355,7 @@ public class ZookeeperDiscoveryImpl { if (killedNodes != null) { if (log.isInfoEnabled()) { - log.info("Communication error resolver forces nodes stop [reqId=" + futId + + log.info("Communication error resolver forced nodes stop [reqId=" + futId + ", killNodeCnt=" + killedNodes.size() + ", nodeIds=" + U.nodeIds(killedNodes) + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index 58098b9..44e48f9 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -61,6 +61,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest; @@ -72,9 +73,11 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.logger.java.JavaLogger; import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.configuration.CommunicationProblemContext; import org.apache.ignite.configuration.CommunicationProblemResolver; @@ -147,7 +150,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { private boolean persistence; /** */ - private CommunicationProblemResolver commProblemRslvr; + private IgniteOutClosure<CommunicationProblemResolver> commProblemRslvr; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { @@ -248,7 +251,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { cfg.setCommunicationSpi(new ZkTestCommunicationSpi()); if (commProblemRslvr != null) - cfg.setCommunicationProblemResolver(commProblemRslvr); + cfg.setCommunicationProblemResolver(commProblemRslvr.apply()); return cfg; } @@ -1808,7 +1811,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { assert nodes > 1; sesTimeout = 2000; - commProblemRslvr = new NoOpCommunicationProblemResolver(); + commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY; startGridsMultiThreaded(nodes); @@ -1828,7 +1831,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { ZookeeperDiscoverySpi spi = spi(ignite(idx1)); - spi.onCommunicationConnectionError(ignite(idx2).cluster().localNode(), new Exception("test")); + spi.resolveCommunicationError(ignite(idx2).cluster().localNode(), new Exception("test")); checkInternalStructuresCleanup(); } @@ -1840,7 +1843,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { public void testNoOpCommunicationErrorResolve_3() throws Exception { // One node fails before sending communication status. sesTimeout = 2000; - commProblemRslvr = new NoOpCommunicationProblemResolver(); + commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY; startGridsMultiThreaded(3); @@ -1855,7 +1858,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { @Override public Object call() { ZookeeperDiscoverySpi spi = spi(ignite(0)); - spi.onCommunicationConnectionError(ignite(1).cluster().localNode(), new Exception("test")); + spi.resolveCommunicationError(ignite(1).cluster().localNode(), new Exception("test")); return null; } @@ -1883,11 +1886,11 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testNoOpCommunicationErrorResolve_4() throws Exception { - // Coordinator changes while resolve process is in progress. + // Coordinator fails while resolve process is in progress. testCommSpi = true; sesTimeout = 2000; - commProblemRslvr = new NoOpCommunicationProblemResolver(); + commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY; startGrid(0); @@ -1901,7 +1904,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { @Override public Object call() { ZookeeperDiscoverySpi spi = spi(ignite(1)); - spi.onCommunicationConnectionError(ignite(2).cluster().localNode(), new Exception("test")); + spi.resolveCommunicationError(ignite(2).cluster().localNode(), new Exception("test")); return null; } @@ -1970,6 +1973,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** + * TODO ZK: kill random, kill coordinator multiple times. + * * @param startNodes Number of nodes to start. * @param killNodes Nodes to kill by resolve process. * @throws Exception If failed. @@ -1977,7 +1982,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { private void communicationErrorResolve_KillNodes(int startNodes, Collection<Long> killNodes) throws Exception { testCommSpi = true; - commProblemRslvr = new TestNodeKillCommunicationProblemResolver(killNodes); + commProblemRslvr = TestNodeKillCommunicationProblemResolver.factory(killNodes); startGrids(startNodes); @@ -1986,20 +1991,28 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { commSpi.checkRes = new BitSet(startNodes); ZookeeperDiscoverySpi spi = null; + UUID killNodeId = null; for (Ignite node : G.allGrids()) { ZookeeperDiscoverySpi spi0 = spi(node); - if (!killNodes.contains(node.cluster().localNode().order())) { + if (!killNodes.contains(node.cluster().localNode().order())) spi = spi0; - - break; - } + else + killNodeId = node.cluster().localNode().id(); } assertNotNull(spi); + assertNotNull(killNodeId); - spi.onCommunicationConnectionError(ignite(1).cluster().localNode(), new Exception("test")); + try { + spi.resolveCommunicationError(spi.getNode(killNodeId), new Exception("test")); + + fail("Exception is not thrown"); + } + catch (IgniteSpiException e) { + assertTrue("Unexpected exception: " + e, e.getCause() instanceof ClusterTopologyCheckedException); + } int expNodes = startNodes - killNodes.size(); @@ -2016,6 +2029,46 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testDefaultCommunicationErrorResolver1() throws Exception { + testCommSpi = true; + sesTimeout = 5000; + + startGrids(3); + + { + ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.forNode(ignite(0)); + commSpi.checkRes = new BitSet(3); + commSpi.checkRes.set(0); + commSpi.checkRes.set(1); + } + { + ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.forNode(ignite(1)); + commSpi.checkRes = new BitSet(3); + commSpi.checkRes.set(0); + commSpi.checkRes.set(1); + } + { + ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.forNode(ignite(2)); + commSpi.checkRes = new BitSet(3); + commSpi.checkRes.set(2); + } + + UUID killedId = nodeId(2); + + assertNotNull(ignite(0).cluster().node(killedId)); + + ZookeeperDiscoverySpi spi = spi(ignite(0)); + + spi.resolveCommunicationError(spi.getNode(ignite(1).cluster().localNode().id()), new Exception("test")); + + waitForTopology(2); + + assertNull(ignite(0).cluster().node(killedId)); + } + + /** + * @throws Exception If failed. + */ public void testConnectionCheck() throws Exception { final int NODES = 5; @@ -2709,6 +2762,13 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { * */ static class NoOpCommunicationProblemResolver implements CommunicationProblemResolver { + /** */ + static final IgniteOutClosure<CommunicationProblemResolver> FACTORY = new IgniteOutClosure<CommunicationProblemResolver>() { + @Override public CommunicationProblemResolver apply() { + return new NoOpCommunicationProblemResolver(); + } + }; + /** {@inheritDoc} */ @Override public void resolve(CommunicationProblemContext ctx) { // No-op. @@ -2719,6 +2779,18 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { * */ static class TestNodeKillCommunicationProblemResolver implements CommunicationProblemResolver { + /** + * @param killOrders Killed nodes order. + * @return Factory. + */ + static IgniteOutClosure<CommunicationProblemResolver> factory(final Collection<Long> killOrders) { + return new IgniteOutClosure<CommunicationProblemResolver>() { + @Override public CommunicationProblemResolver apply() { + return new TestNodeKillCommunicationProblemResolver(killOrders); + } + }; + } + /** */ final Collection<Long> killNodeOrders; http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index 93cd911..36403fa 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -607,6 +607,16 @@ public class GridSpiTestContext implements IgniteSpiContext { return Collections.emptyMap(); } + /** {@inheritDoc} */ + @Override public boolean communicationErrorResolveSupported() { + return false; + } + + /** {@inheritDoc} */ + @Override public void resolveCommunicationError(ClusterNode node, Exception err) { + throw new UnsupportedOperationException(); + } + /** * @param cacheName Cache name. * @return Map representing cache.
