Repository: ignite Updated Branches: refs/heads/ignite-zk 62dd9df03 -> dc8903caf
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dc8903ca Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dc8903ca Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dc8903ca Branch: refs/heads/ignite-zk Commit: dc8903caf11c2de4403ab4fb34e6c1482a89be16 Parents: 62dd9df Author: sboikov <[email protected]> Authored: Tue Jan 9 16:27:41 2018 +0300 Committer: sboikov <[email protected]> Committed: Tue Jan 9 17:50:46 2018 +0300 ---------------------------------------------------------------------- .../DiscoveryMessageResultsCollector.java | 2 +- .../continuous/GridContinuousProcessor.java | 9 +- .../discovery/DiscoverySpiCustomMessage.java | 4 +- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 18 +- .../internal/ZkCommunicationProblemContext.java | 10 +- .../zk/internal/ZkForceNodeFailMessage.java | 8 +- .../zk/internal/ZookeeperDiscoveryImpl.java | 252 +++++++++++-------- .../ignite/internal/ClusterGroupSelfTest.java | 2 + .../apache/ignite/internal/GridSelfTest.java | 2 + .../GridAffinityProcessorAbstractSelfTest.java | 4 +- .../service/ClosureServiceClientsNodesTest.java | 19 +- .../zk/internal/ZookeeperDiscoverySpiTest.java | 7 + .../zk/ZookeeperDiscoveryTestSuite.java | 40 +++ 13 files changed, 235 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java index 72a4636..43be952 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java @@ -196,7 +196,7 @@ public abstract class DiscoveryMessageResultsCollector<M, R> { nodeFailed = true; - return false; + return true; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 950ac7e..a62dcd1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -166,12 +166,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @Override public void start() throws IgniteCheckedException { discoProtoVer = ctx.discovery().mutableCustomMessages() ? 1 : 2; - if (ctx.config().isDaemon()) - return; - if (discoProtoVer == 2) routinesInfo = new ContinuousRoutinesInfo(); + if (ctx.config().isDaemon()) + return; + retryDelay = ctx.config().getNetworkSendRetryDelay(); retryCnt = ctx.config().getNetworkSendRetryCount(); @@ -509,6 +509,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { if (discoProtoVer == 2) { + if (ctx.isDaemon()) + return; + if (data.commonData() != null) { ContinuousRoutinesCommonDiscoveryData commonData = (ContinuousRoutinesCommonDiscoveryData)data.commonData(); http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java index 10e1101..dbaea63 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java @@ -43,8 +43,8 @@ public interface DiscoverySpiCustomMessage extends Serializable { /** * Called on discovery coordinator node after after listener is notified. If returns {@code true} - * then message is not passed to others nodes, if after this {@link #ackMessage()} returns ack, it is sent to - * all nodes. + * then message is not passed to others nodes, if after this method {@link #ackMessage()} returns non-null ack + * message, it is sent to all nodes. * * @return {@code True} if message should not be sent to all nodes. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index b695e9a..a89d46a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -432,9 +432,11 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery ZookeeperClusterNode locNode = initLocalNode(); - log.info("Start Zookeeper discovery [zkConnectionString=" + zkConnectionString + - ", sessionTimeout=" + sesTimeout + - ", zkRootPath=" + zkRootPath + ']'); + if (log.isInfoEnabled()) { + log.info("Start Zookeeper discovery [zkConnectionString=" + zkConnectionString + + ", sessionTimeout=" + sesTimeout + + ", zkRootPath=" + zkRootPath + ']'); + } impl = new ZookeeperDiscoveryImpl( this, @@ -470,12 +472,6 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery } /** {@inheritDoc} */ - @Override protected void onContextDestroyed0() { - if (impl != null) - impl.onStop(); - } - - /** {@inheritDoc} */ @Override public void spiStop() throws IgniteSpiException { if (impl != null) impl.stop(); @@ -489,15 +485,13 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery initAddresses(); - consistentId = consistentId(); - ZookeeperClusterNode locNode = new ZookeeperClusterNode( ignite.configuration().getNodeId(), addrs.get1(), addrs.get2(), locNodeVer, locNodeAttrs, - consistentId, + consistentId(), sesTimeout, ignite.configuration().isClientMode(), metricsProvider); http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java index 9cb48cc..6159e5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java @@ -27,6 +27,7 @@ import java.util.Set; import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CommunicationProblemContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; /** * @@ -51,15 +52,22 @@ class ZkCommunicationProblemContext implements CommunicationProblemContext { /** */ private final List<ClusterNode> curNodes; + /** */ + private final GridCacheSharedContext ctx; + /** + * @param ctx Context. * @param curNodes Current topology snapshot. * @param initialNodes Topology snapshot when communication error resolve started. * @param nodesState Nodes communication state. */ - ZkCommunicationProblemContext(List<ClusterNode> curNodes, + ZkCommunicationProblemContext( + GridCacheSharedContext ctx, + List<ClusterNode> curNodes, List<ClusterNode> initialNodes, Map<UUID, BitSet> nodesState) { + this.ctx = ctx; this.curNodes = Collections.unmodifiableList(curNodes); this.initialNodes = initialNodes; this.nodesState = nodesState; http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java index c76bcba..9d8980e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java @@ -30,17 +30,17 @@ public class ZkForceNodeFailMessage implements DiscoverySpiCustomMessage, ZkInte private static final long serialVersionUID = 0L; /** */ - final UUID nodeId; + final long nodeInternalId; /** */ final String warning; /** - * @param nodeId Node ID. + * @param nodeInternalId Node ID. * @param warning Warning to be displayed on all nodes. */ - ZkForceNodeFailMessage(UUID nodeId, String warning) { - this.nodeId = nodeId; + ZkForceNodeFailMessage(long nodeInternalId, String warning) { + this.nodeInternalId = nodeInternalId; this.warning = warning; } http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index c4756fe..030e43e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -53,6 +53,7 @@ import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; @@ -415,7 +416,7 @@ public class ZookeeperDiscoveryImpl { return; } - sendCustomMessage(new ZkForceNodeFailMessage(nodeId, warning)); + sendCustomMessage(new ZkForceNodeFailMessage(node.internalId(), warning)); } /** @@ -1437,6 +1438,8 @@ public class ZookeeperDiscoveryImpl { // No-op. } } + else if (evtData0.resolvedMsg instanceof ZkForceNodeFailMessage) + deleteAliveNode(((ZkForceNodeFailMessage)evtData0.resolvedMsg).nodeInternalId); } } } @@ -2281,14 +2284,18 @@ public class ZookeeperDiscoveryImpl { ZookeeperClient zkClient = rtState.zkClient; ZkDiscoveryEventsData evtsData = rtState.evtsData; + ZookeeperClusterNode failedNode = null; + if (msg instanceof ZkForceNodeFailMessage) { ZkForceNodeFailMessage msg0 = (ZkForceNodeFailMessage)msg; - if (rtState.top.nodesById.containsKey(msg0.nodeId)) + failedNode = rtState.top.nodesByInternalId.get(msg0.nodeInternalId); + + if (failedNode != null) evtsData.topVer++; else { if (log.isDebugEnabled()) - log.debug("Ignore forcible node fail request for unknown node: " + msg0.nodeId); + log.debug("Ignore forcible node fail request for unknown node: " + msg0.nodeInternalId); deleteCustomEventDataAsync(zkClient, evtPath); @@ -2352,7 +2359,7 @@ public class ZookeeperDiscoveryImpl { fastStopProcess = true; - // No need process this event on others nodes, skip this event. + // No need to process this event on others nodes, skip this event. evtsData.evts.remove(evtData.eventId()); evtsData.evtIdGen--; @@ -2381,6 +2388,26 @@ public class ZookeeperDiscoveryImpl { if (fastStopProcess) deleteCustomEventDataAsync(zkClient, evtPath); + + if (failedNode != null) { + deleteAliveNode(failedNode.internalId()); + + handleProcessedEventsOnNodesFail(Collections.singletonList(failedNode)); + } + } + } + + /** + * @param internalId Node internal ID. + * @throws Exception If failed. + */ + private void deleteAliveNode(long internalId) throws Exception { + for (String child : rtState.zkClient.getChildren(zkPaths.aliveNodesDir)) { + if (ZkIgnitePaths.aliveInternalId(child) == internalId) { + rtState.zkClient.deleteIfExistsAsync(zkPaths.aliveNodesDir + "/" + child); + + break; + } } } @@ -2465,127 +2492,152 @@ public class ZookeeperDiscoveryImpl { boolean evtProcessed = false; boolean updateNodeInfo = false; - for (ZkDiscoveryEventData evtData : evts.tailMap(rtState.locNodeInfo.lastProcEvt, false).values()) { - if (!rtState.joined) { - if (evtData.eventType() != ZkDiscoveryEventData.ZK_EVT_NODE_JOIN) - continue; + try { + for (ZkDiscoveryEventData evtData : evts.tailMap(rtState.locNodeInfo.lastProcEvt, false).values()) { + if (!rtState.joined) { + if (evtData.eventType() != ZkDiscoveryEventData.ZK_EVT_NODE_JOIN) + continue; - ZkDiscoveryNodeJoinEventData evtData0 = (ZkDiscoveryNodeJoinEventData)evtData; + ZkDiscoveryNodeJoinEventData evtData0 = (ZkDiscoveryNodeJoinEventData)evtData; - UUID joinedId = evtData0.nodeId; + UUID joinedId = evtData0.nodeId; - boolean locJoin = evtData0.joinedInternalId == rtState.internalOrder; + boolean locJoin = evtData0.joinedInternalId == rtState.internalOrder; - if (locJoin) { - assert locNode.id().equals(joinedId); + if (locJoin) { + assert locNode.id().equals(joinedId); - processLocalJoin(evtsData, evtData0); + processLocalJoin(evtsData, evtData0); - evtProcessed = true; + evtProcessed = true; + } } - } - else { - if (log.isDebugEnabled()) - log.debug("New discovery event data [evt=" + evtData + ", evtsHist=" + evts.size() + ']'); + else { + if (log.isDebugEnabled()) + log.debug("New discovery event data [evt=" + evtData + ", evtsHist=" + evts.size() + ']'); - switch (evtData.eventType()) { - case ZkDiscoveryEventData.ZK_EVT_NODE_JOIN: { - ZkDiscoveryNodeJoinEventData evtData0 = (ZkDiscoveryNodeJoinEventData)evtData; + switch (evtData.eventType()) { + case ZkDiscoveryEventData.ZK_EVT_NODE_JOIN: { + ZkDiscoveryNodeJoinEventData evtData0 = (ZkDiscoveryNodeJoinEventData)evtData; - ZkJoiningNodeData joiningData; + ZkJoiningNodeData joiningData; - if (rtState.crd) { - assert evtData0.joiningNodeData != null; + if (rtState.crd) { + assert evtData0.joiningNodeData != null; - joiningData = evtData0.joiningNodeData; - } - else { - joiningData = unmarshalJoinData(evtData0.nodeId, evtData0.joinDataPrefixId); + joiningData = evtData0.joiningNodeData; + } + else { + joiningData = unmarshalJoinData(evtData0.nodeId, evtData0.joinDataPrefixId); - DiscoveryDataBag dataBag = new DiscoveryDataBag(evtData0.nodeId); + DiscoveryDataBag dataBag = new DiscoveryDataBag(evtData0.nodeId); - dataBag.joiningNodeData(joiningData.discoveryData()); + dataBag.joiningNodeData(joiningData.discoveryData()); - exchange.onExchange(dataBag); - } + exchange.onExchange(dataBag); + } - if (evtData0.secSubjPartCnt > 0 && joiningData.node().attribute(ATTR_SECURITY_SUBJECT_V2) == null) - readAndInitSecuritySubject(joiningData.node(), evtData0); + if (evtData0.secSubjPartCnt > 0 && joiningData.node().attribute(ATTR_SECURITY_SUBJECT_V2) == null) + readAndInitSecuritySubject(joiningData.node(), evtData0); - notifyNodeJoin(evtData0, joiningData); + notifyNodeJoin(evtData0, joiningData); - break; - } + break; + } - case ZkDiscoveryEventData.ZK_EVT_NODE_FAILED: { - notifyNodeFail((ZkDiscoveryNodeFailEventData)evtData); + case ZkDiscoveryEventData.ZK_EVT_NODE_FAILED: { + notifyNodeFail((ZkDiscoveryNodeFailEventData)evtData); - break; - } + break; + } - case ZkDiscoveryEventData.ZK_EVT_CUSTOM_EVT: { - ZkDiscoveryCustomEventData evtData0 = (ZkDiscoveryCustomEventData)evtData; + case ZkDiscoveryEventData.ZK_EVT_CUSTOM_EVT: { + ZkDiscoveryCustomEventData evtData0 = (ZkDiscoveryCustomEventData)evtData; - if (evtData0.ackEvent() && evtData0.topologyVersion() < locNode.order()) - break; + if (evtData0.ackEvent() && evtData0.topologyVersion() < locNode.order()) + break; - DiscoverySpiCustomMessage msg; + DiscoverySpiCustomMessage msg; - if (rtState.crd) { - assert evtData0.resolvedMsg != null : evtData0; + if (rtState.crd) { + assert evtData0.resolvedMsg != null : evtData0; - msg = evtData0.resolvedMsg; - } - else { - if (evtData0.msg == null) { - if (evtData0.ackEvent()) { - String path = zkPaths.ackEventDataPath(evtData0.origEvtId); + msg = evtData0.resolvedMsg; + } + else { + if (evtData0.msg == null) { + if (evtData0.ackEvent()) { + String path = zkPaths.ackEventDataPath(evtData0.origEvtId); - msg = unmarshalZip(zkClient.getData(path)); - } - else { - assert evtData0.evtPath != null : evtData0; + msg = unmarshalZip(zkClient.getData(path)); + } + else { + assert evtData0.evtPath != null : evtData0; - byte[] msgBytes = readCustomEventData(zkClient, - evtData0.evtPath, - evtData0.sndNodeId); + byte[] msgBytes = readCustomEventData(zkClient, + evtData0.evtPath, + evtData0.sndNodeId); - msg = unmarshalZip(msgBytes); + msg = unmarshalZip(msgBytes); + } } + else + msg = evtData0.msg; + + evtData0.resolvedMsg = msg; } - else - msg = evtData0.msg; - evtData0.resolvedMsg = msg; - } + if (msg instanceof ZkInternalMessage) + processInternalMessage(evtData0, (ZkInternalMessage)msg); + else { + notifyCustomEvent(evtData0, msg); - if (msg instanceof ZkInternalMessage) - processInternalMessage(evtData0, (ZkInternalMessage)msg); - else { - notifyCustomEvent(evtData0, msg); + if (!evtData0.ackEvent()) + updateNodeInfo = true; + } - if (!evtData0.ackEvent()) - updateNodeInfo = true; + break; } - break; + default: + assert false : "Invalid event: " + evtData; } - default: - assert false : "Invalid event: " + evtData; + evtProcessed = true; + } + + if (rtState.joined) { + rtState.locNodeInfo.lastProcEvt = evtData.eventId(); + + rtState.procEvtCnt++; + + if (rtState.procEvtCnt % evtsAckThreshold == 0) + updateNodeInfo = true; } + } + } + catch (KeeperException.NoNodeException e) { + // Can get NoNodeException if local node was forcible failed, + // in this case coordinator does not wait when this node process all events. + boolean exists; - evtProcessed = true; + try { + exists = rtState.zkClient.exists(rtState.locNodeZkPath); } + catch (Exception e0) { + if (log.isDebugEnabled()) + log.debug("Failed to check is local node is alive:" + e0); - if (rtState.joined) { - rtState.locNodeInfo.lastProcEvt = evtData.eventId(); + exists = true; + } - rtState.procEvtCnt++; + if (!exists) { + U.warn(log, "Failed to process discovery event, local node was forced to stop.", e); - if (rtState.procEvtCnt % evtsAckThreshold == 0) - updateNodeInfo = true; + throw localNodeFail("Local node was forced to stop.", true); } + + throw e; } if (rtState.crd) @@ -2819,38 +2871,26 @@ public class ZookeeperDiscoveryImpl { throws Exception { ClusterNode creatorNode = rtState.top.nodesById.get(evtData.sndNodeId); + ZookeeperClusterNode node = rtState.top.nodesByInternalId.get(msg.nodeInternalId); + + assert node != null : msg.nodeInternalId; + if (msg.warning != null) { U.warn(log, "Received force EVT_NODE_FAILED event with warning [" + - "nodeId=" + msg.nodeId + + "nodeId=" + node.id() + ", msg=" + msg.warning + ", nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : evtData.sndNodeId) + ']'); } else { U.warn(log, "Received force EVT_NODE_FAILED event [" + - "nodeId=" + msg.nodeId + + "nodeId=" + node.id() + ", nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : evtData.sndNodeId) + ']'); } - ZookeeperClusterNode node = rtState.top.nodesById.get(msg.nodeId); - - assert node != null : msg.nodeId; - if (node.isLocal()) throw localNodeFail("Received force EVT_NODE_FAILED event for local node.", true); else notifyNodeFail(node.internalId(), evtData.topologyVersion()); - - if (rtState.crd) { - ZookeeperClient zkClient = rtState.zkClient; - - for (String child : zkClient.getChildren(zkPaths.aliveNodesDir)) { - if (ZkIgnitePaths.aliveInternalId(child) == node.internalId()) { - zkClient.deleteIfExistsAsync(zkPaths.aliveNodesDir + "/" + child); - - break; - } - } - } } /** @@ -3104,6 +3144,7 @@ public class ZookeeperDiscoveryImpl { } ZkCommunicationProblemContext ctx = new ZkCommunicationProblemContext( + ((IgniteKernal)spi.ignite()).context().cache().context(), topSnapshot, initialNodes, nodesRes); @@ -3296,7 +3337,7 @@ public class ZookeeperDiscoveryImpl { * @param clientReconnect {@code True} if allow client reconnect. * @return Exception to be thrown. */ - private Exception localNodeFail(String msg, boolean clientReconnect) { + private ZookeeperClientFailedException localNodeFail(String msg, boolean clientReconnect) { U.warn(log, msg); // if (locNode.isClient() && rtState.zkClient.connected()) { @@ -3611,13 +3652,6 @@ public class ZookeeperDiscoveryImpl { /** * */ - public void onStop() { - // No-op. - } - - /** - * - */ public void stop() { stop0(new IgniteSpiException("Node stopped")); } @@ -3696,8 +3730,6 @@ public class ZookeeperDiscoveryImpl { U.error(log, "Fatal error in ZookeeperDiscovery. " + "Stopping the node in order to prevent cluster wide instability.", err); - onStop(); - stop0(err); new Thread(new Runnable() { http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java index 9df561a..99006d1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java @@ -68,6 +68,8 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest { if (i == 0) ignite = g; } + + waitForTopology(NODES_CNT); } finally { Ignition.setClientMode(false); http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java index 7e368cb..f71ffb0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java @@ -45,6 +45,8 @@ public class GridSelfTest extends ClusterGroupAbstractTest { for (int i = 0; i < NODES_CNT; i++) startGrid(i); + + waitForTopology(NODES_CNT); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java index 1d70246..aa2abae 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java @@ -91,12 +91,12 @@ public abstract class GridAffinityProcessorAbstractSelfTest extends GridCommonAb @Override protected void beforeTestsStarted() throws Exception { assert NODES_CNT >= 1; - withCache = false; + withCache = true; for (int i = 0; i < NODES_CNT; i++) startGrid(i); - withCache = true; + withCache = false; for (int i = NODES_CNT; i < 2 * NODES_CNT; i++) startGrid(i); http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java index c1af323..8f03c4c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java @@ -49,6 +49,9 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest { /** Number of grids started for tests. */ private static final int NODES_CNT = 4; + /** */ + private static final int CLIENT_IDX = 1; + /** Test singleton service name. */ private static final String SINGLETON_NAME = "testSingleton"; @@ -61,11 +64,11 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest { cfg.setMarshaller(new BinaryMarshaller()); - cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder).setForceServerMode(true)); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); cfg.setCacheConfiguration(); - if (igniteInstanceName.equals(getTestIgniteInstanceName(0))) + if (igniteInstanceName.equals(getTestIgniteInstanceName(CLIENT_IDX))) cfg.setClientMode(true); return cfg; @@ -88,8 +91,10 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest { public void testDefaultClosure() throws Exception { Set<String> srvNames = new HashSet<>(NODES_CNT - 1); - for (int i = 1; i < NODES_CNT; ++i) - srvNames.add(getTestIgniteInstanceName(i)); + for (int i = 0; i < NODES_CNT; ++i) { + if (i != CLIENT_IDX) + srvNames.add(getTestIgniteInstanceName(i)); + } for (int i = 0 ; i < NODES_CNT; i++) { log.info("Iteration: " + i); @@ -137,7 +142,7 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest { assertEquals(1, res.size()); - assertEquals(getTestIgniteInstanceName(0), F.first(res)); + assertEquals(getTestIgniteInstanceName(CLIENT_IDX), F.first(res)); } } @@ -168,7 +173,7 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testDefaultService() throws Exception { - UUID clientNodeId = grid(0).cluster().localNode().id(); + UUID clientNodeId = grid(CLIENT_IDX).cluster().localNode().id(); for (int i = 0 ; i < NODES_CNT; i++) { log.info("Iteration: " + i); @@ -209,7 +214,7 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testClientService() throws Exception { - UUID clientNodeId = grid(0).cluster().localNode().id(); + UUID clientNodeId = grid(CLIENT_IDX).cluster().localNode().id(); for (int i = 0 ; i < NODES_CNT; i++) { log.info("Iteration: " + i); http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java index af73535..e020bcc 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java @@ -2859,6 +2859,13 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testCommunicationErrorResolve_CachesInfo1() throws Exception { + + } + + /** + * @throws Exception If failed. + */ public void testConnectionCheck() throws Exception { final int NODES = 5; http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoveryTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoveryTestSuite.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoveryTestSuite.java new file mode 100644 index 0000000..4ea9b37 --- /dev/null +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoveryTestSuite.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk; + +import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest; + +/** + * + */ +public class ZookeeperDiscoveryTestSuite extends TestSuite { + /** + * @return Test suite. + * @throws Exception Thrown in case of the failure. + */ + public static TestSuite suite() throws Exception { + // TODO ZK + TestSuite suite = new TestSuite("ZookeeperDiscoverySpi Test Suite"); + + // Regular tests executed with ZookeeperDiscoverySpi. + suite.addTestSuite(GridEventConsumeSelfTest.class); + + return suite; + } +}
