Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 62dd9df03 -> dc8903caf


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dc8903ca
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dc8903ca
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dc8903ca

Branch: refs/heads/ignite-zk
Commit: dc8903caf11c2de4403ab4fb34e6c1482a89be16
Parents: 62dd9df
Author: sboikov <[email protected]>
Authored: Tue Jan 9 16:27:41 2018 +0300
Committer: sboikov <[email protected]>
Committed: Tue Jan 9 17:50:46 2018 +0300

----------------------------------------------------------------------
 .../DiscoveryMessageResultsCollector.java       |   2 +-
 .../continuous/GridContinuousProcessor.java     |   9 +-
 .../discovery/DiscoverySpiCustomMessage.java    |   4 +-
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java |  18 +-
 .../internal/ZkCommunicationProblemContext.java |  10 +-
 .../zk/internal/ZkForceNodeFailMessage.java     |   8 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 252 +++++++++++--------
 .../ignite/internal/ClusterGroupSelfTest.java   |   2 +
 .../apache/ignite/internal/GridSelfTest.java    |   2 +
 .../GridAffinityProcessorAbstractSelfTest.java  |   4 +-
 .../service/ClosureServiceClientsNodesTest.java |  19 +-
 .../zk/internal/ZookeeperDiscoverySpiTest.java  |   7 +
 .../zk/ZookeeperDiscoveryTestSuite.java         |  40 +++
 13 files changed, 235 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java
index 72a4636..43be952 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java
@@ -196,7 +196,7 @@ public abstract class DiscoveryMessageResultsCollector<M, 
R>  {
 
             nodeFailed = true;
 
-            return false;
+            return true;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 950ac7e..a62dcd1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -166,12 +166,12 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     @Override public void start() throws IgniteCheckedException {
         discoProtoVer = ctx.discovery().mutableCustomMessages() ? 1 : 2;
 
-        if (ctx.config().isDaemon())
-            return;
-
         if (discoProtoVer == 2)
             routinesInfo = new ContinuousRoutinesInfo();
 
+        if (ctx.config().isDaemon())
+            return;
+
         retryDelay = ctx.config().getNetworkSendRetryDelay();
         retryCnt = ctx.config().getNetworkSendRetryCount();
 
@@ -509,6 +509,9 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     /** {@inheritDoc} */
     @Override public void onGridDataReceived(GridDiscoveryData data) {
         if (discoProtoVer == 2) {
+            if (ctx.isDaemon())
+                return;
+
             if (data.commonData() != null) {
                 ContinuousRoutinesCommonDiscoveryData commonData =
                     (ContinuousRoutinesCommonDiscoveryData)data.commonData();

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
index 10e1101..dbaea63 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
@@ -43,8 +43,8 @@ public interface DiscoverySpiCustomMessage extends 
Serializable {
 
     /**
      * Called on discovery coordinator node after after listener is notified. 
If returns {@code true}
-     * then message is not passed to others nodes, if after this {@link 
#ackMessage()} returns ack, it is sent to
-     * all nodes.
+     * then message is not passed to others nodes, if after this method {@link 
#ackMessage()} returns non-null ack
+     * message, it is sent to all nodes.
      *
      * @return {@code True} if message should not be sent to all nodes.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index b695e9a..a89d46a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -432,9 +432,11 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
 
         ZookeeperClusterNode locNode = initLocalNode();
 
-        log.info("Start Zookeeper discovery [zkConnectionString=" + 
zkConnectionString +
-            ", sessionTimeout=" + sesTimeout +
-            ", zkRootPath=" + zkRootPath + ']');
+        if (log.isInfoEnabled()) {
+            log.info("Start Zookeeper discovery [zkConnectionString=" + 
zkConnectionString +
+                ", sessionTimeout=" + sesTimeout +
+                ", zkRootPath=" + zkRootPath + ']');
+        }
 
         impl = new ZookeeperDiscoveryImpl(
             this,
@@ -470,12 +472,6 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
     }
 
     /** {@inheritDoc} */
-    @Override protected void onContextDestroyed0() {
-        if (impl != null)
-            impl.onStop();
-    }
-
-    /** {@inheritDoc} */
     @Override public void spiStop() throws IgniteSpiException {
         if (impl != null)
             impl.stop();
@@ -489,15 +485,13 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
 
         initAddresses();
 
-        consistentId = consistentId();
-
         ZookeeperClusterNode locNode = new ZookeeperClusterNode(
             ignite.configuration().getNodeId(),
             addrs.get1(),
             addrs.get2(),
             locNodeVer,
             locNodeAttrs,
-            consistentId,
+            consistentId(),
             sesTimeout,
             ignite.configuration().isClientMode(),
             metricsProvider);

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java
index 9cb48cc..6159e5e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CommunicationProblemContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 
 /**
  *
@@ -51,15 +52,22 @@ class ZkCommunicationProblemContext implements 
CommunicationProblemContext {
     /** */
     private final List<ClusterNode> curNodes;
 
+    /** */
+    private final GridCacheSharedContext ctx;
+
     /**
+     * @param ctx Context.
      * @param curNodes Current topology snapshot.
      * @param initialNodes Topology snapshot when communication error resolve 
started.
      * @param nodesState Nodes communication state.
      */
-    ZkCommunicationProblemContext(List<ClusterNode> curNodes,
+    ZkCommunicationProblemContext(
+        GridCacheSharedContext ctx,
+        List<ClusterNode> curNodes,
         List<ClusterNode> initialNodes,
         Map<UUID, BitSet> nodesState)
     {
+        this.ctx = ctx;
         this.curNodes = Collections.unmodifiableList(curNodes);
         this.initialNodes = initialNodes;
         this.nodesState = nodesState;

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
index c76bcba..9d8980e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
@@ -30,17 +30,17 @@ public class ZkForceNodeFailMessage implements 
DiscoverySpiCustomMessage, ZkInte
     private static final long serialVersionUID = 0L;
 
     /** */
-    final UUID nodeId;
+    final long nodeInternalId;
 
     /** */
     final String warning;
 
     /**
-     * @param nodeId Node ID.
+     * @param nodeInternalId Node ID.
      * @param warning Warning to be displayed on all nodes.
      */
-    ZkForceNodeFailMessage(UUID nodeId, String warning) {
-        this.nodeId = nodeId;
+    ZkForceNodeFailMessage(long nodeInternalId, String warning) {
+        this.nodeInternalId = nodeInternalId;
         this.warning = warning;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index c4756fe..030e43e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.ClusterMetricsSnapshot;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
@@ -415,7 +416,7 @@ public class ZookeeperDiscoveryImpl {
             return;
         }
 
-        sendCustomMessage(new ZkForceNodeFailMessage(nodeId, warning));
+        sendCustomMessage(new ZkForceNodeFailMessage(node.internalId(), 
warning));
     }
 
     /**
@@ -1437,6 +1438,8 @@ public class ZookeeperDiscoveryImpl {
                         // No-op.
                     }
                 }
+                else if (evtData0.resolvedMsg instanceof 
ZkForceNodeFailMessage)
+                    
deleteAliveNode(((ZkForceNodeFailMessage)evtData0.resolvedMsg).nodeInternalId);
             }
         }
     }
@@ -2281,14 +2284,18 @@ public class ZookeeperDiscoveryImpl {
         ZookeeperClient zkClient = rtState.zkClient;
         ZkDiscoveryEventsData evtsData = rtState.evtsData;
 
+        ZookeeperClusterNode failedNode = null;
+
         if (msg instanceof ZkForceNodeFailMessage) {
             ZkForceNodeFailMessage msg0 = (ZkForceNodeFailMessage)msg;
 
-            if (rtState.top.nodesById.containsKey(msg0.nodeId))
+            failedNode = 
rtState.top.nodesByInternalId.get(msg0.nodeInternalId);
+
+            if (failedNode != null)
                 evtsData.topVer++;
             else {
                 if (log.isDebugEnabled())
-                    log.debug("Ignore forcible node fail request for unknown 
node: " + msg0.nodeId);
+                    log.debug("Ignore forcible node fail request for unknown 
node: " + msg0.nodeInternalId);
 
                 deleteCustomEventDataAsync(zkClient, evtPath);
 
@@ -2352,7 +2359,7 @@ public class ZookeeperDiscoveryImpl {
 
                 fastStopProcess = true;
 
-                // No need process this event on others nodes, skip this event.
+                // No need to process this event on others nodes, skip this 
event.
                 evtsData.evts.remove(evtData.eventId());
 
                 evtsData.evtIdGen--;
@@ -2381,6 +2388,26 @@ public class ZookeeperDiscoveryImpl {
 
             if (fastStopProcess)
                 deleteCustomEventDataAsync(zkClient, evtPath);
+
+            if (failedNode != null) {
+                deleteAliveNode(failedNode.internalId());
+
+                
handleProcessedEventsOnNodesFail(Collections.singletonList(failedNode));
+            }
+        }
+    }
+
+    /**
+     * @param internalId Node internal ID.
+     * @throws Exception If failed.
+     */
+    private void deleteAliveNode(long internalId) throws Exception {
+        for (String child : 
rtState.zkClient.getChildren(zkPaths.aliveNodesDir)) {
+            if (ZkIgnitePaths.aliveInternalId(child) == internalId) {
+                rtState.zkClient.deleteIfExistsAsync(zkPaths.aliveNodesDir + 
"/" + child);
+
+                break;
+            }
         }
     }
 
@@ -2465,127 +2492,152 @@ public class ZookeeperDiscoveryImpl {
         boolean evtProcessed = false;
         boolean updateNodeInfo = false;
 
-        for (ZkDiscoveryEventData evtData : 
evts.tailMap(rtState.locNodeInfo.lastProcEvt, false).values()) {
-            if (!rtState.joined) {
-                if (evtData.eventType() != 
ZkDiscoveryEventData.ZK_EVT_NODE_JOIN)
-                    continue;
+        try {
+            for (ZkDiscoveryEventData evtData : 
evts.tailMap(rtState.locNodeInfo.lastProcEvt, false).values()) {
+                if (!rtState.joined) {
+                    if (evtData.eventType() != 
ZkDiscoveryEventData.ZK_EVT_NODE_JOIN)
+                        continue;
 
-                ZkDiscoveryNodeJoinEventData evtData0 = 
(ZkDiscoveryNodeJoinEventData)evtData;
+                    ZkDiscoveryNodeJoinEventData evtData0 = 
(ZkDiscoveryNodeJoinEventData)evtData;
 
-                UUID joinedId = evtData0.nodeId;
+                    UUID joinedId = evtData0.nodeId;
 
-                boolean locJoin = evtData0.joinedInternalId == 
rtState.internalOrder;
+                    boolean locJoin = evtData0.joinedInternalId == 
rtState.internalOrder;
 
-                if (locJoin) {
-                    assert locNode.id().equals(joinedId);
+                    if (locJoin) {
+                        assert locNode.id().equals(joinedId);
 
-                    processLocalJoin(evtsData, evtData0);
+                        processLocalJoin(evtsData, evtData0);
 
-                    evtProcessed = true;
+                        evtProcessed = true;
+                    }
                 }
-            }
-            else {
-                if (log.isDebugEnabled())
-                    log.debug("New discovery event data [evt=" + evtData + ", 
evtsHist=" + evts.size() + ']');
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("New discovery event data [evt=" + evtData + 
", evtsHist=" + evts.size() + ']');
 
-                switch (evtData.eventType()) {
-                    case ZkDiscoveryEventData.ZK_EVT_NODE_JOIN: {
-                        ZkDiscoveryNodeJoinEventData evtData0 = 
(ZkDiscoveryNodeJoinEventData)evtData;
+                    switch (evtData.eventType()) {
+                        case ZkDiscoveryEventData.ZK_EVT_NODE_JOIN: {
+                            ZkDiscoveryNodeJoinEventData evtData0 = 
(ZkDiscoveryNodeJoinEventData)evtData;
 
-                        ZkJoiningNodeData joiningData;
+                            ZkJoiningNodeData joiningData;
 
-                        if (rtState.crd) {
-                            assert evtData0.joiningNodeData != null;
+                            if (rtState.crd) {
+                                assert evtData0.joiningNodeData != null;
 
-                            joiningData = evtData0.joiningNodeData;
-                        }
-                        else {
-                            joiningData = unmarshalJoinData(evtData0.nodeId, 
evtData0.joinDataPrefixId);
+                                joiningData = evtData0.joiningNodeData;
+                            }
+                            else {
+                                joiningData = 
unmarshalJoinData(evtData0.nodeId, evtData0.joinDataPrefixId);
 
-                            DiscoveryDataBag dataBag = new 
DiscoveryDataBag(evtData0.nodeId);
+                                DiscoveryDataBag dataBag = new 
DiscoveryDataBag(evtData0.nodeId);
 
-                            
dataBag.joiningNodeData(joiningData.discoveryData());
+                                
dataBag.joiningNodeData(joiningData.discoveryData());
 
-                            exchange.onExchange(dataBag);
-                        }
+                                exchange.onExchange(dataBag);
+                            }
 
-                        if (evtData0.secSubjPartCnt > 0 && 
joiningData.node().attribute(ATTR_SECURITY_SUBJECT_V2) == null)
-                            readAndInitSecuritySubject(joiningData.node(), 
evtData0);
+                            if (evtData0.secSubjPartCnt > 0 && 
joiningData.node().attribute(ATTR_SECURITY_SUBJECT_V2) == null)
+                                readAndInitSecuritySubject(joiningData.node(), 
evtData0);
 
-                        notifyNodeJoin(evtData0, joiningData);
+                            notifyNodeJoin(evtData0, joiningData);
 
-                        break;
-                    }
+                            break;
+                        }
 
-                    case ZkDiscoveryEventData.ZK_EVT_NODE_FAILED: {
-                        notifyNodeFail((ZkDiscoveryNodeFailEventData)evtData);
+                        case ZkDiscoveryEventData.ZK_EVT_NODE_FAILED: {
+                            
notifyNodeFail((ZkDiscoveryNodeFailEventData)evtData);
 
-                        break;
-                    }
+                            break;
+                        }
 
-                    case ZkDiscoveryEventData.ZK_EVT_CUSTOM_EVT: {
-                        ZkDiscoveryCustomEventData evtData0 = 
(ZkDiscoveryCustomEventData)evtData;
+                        case ZkDiscoveryEventData.ZK_EVT_CUSTOM_EVT: {
+                            ZkDiscoveryCustomEventData evtData0 = 
(ZkDiscoveryCustomEventData)evtData;
 
-                        if (evtData0.ackEvent() && evtData0.topologyVersion() 
< locNode.order())
-                            break;
+                            if (evtData0.ackEvent() && 
evtData0.topologyVersion() < locNode.order())
+                                break;
 
-                        DiscoverySpiCustomMessage msg;
+                            DiscoverySpiCustomMessage msg;
 
-                        if (rtState.crd) {
-                            assert evtData0.resolvedMsg != null : evtData0;
+                            if (rtState.crd) {
+                                assert evtData0.resolvedMsg != null : evtData0;
 
-                            msg = evtData0.resolvedMsg;
-                        }
-                        else {
-                            if (evtData0.msg == null) {
-                                if (evtData0.ackEvent()) {
-                                    String path = 
zkPaths.ackEventDataPath(evtData0.origEvtId);
+                                msg = evtData0.resolvedMsg;
+                            }
+                            else {
+                                if (evtData0.msg == null) {
+                                    if (evtData0.ackEvent()) {
+                                        String path = 
zkPaths.ackEventDataPath(evtData0.origEvtId);
 
-                                    msg = unmarshalZip(zkClient.getData(path));
-                                }
-                                else {
-                                    assert evtData0.evtPath != null : evtData0;
+                                        msg = 
unmarshalZip(zkClient.getData(path));
+                                    }
+                                    else {
+                                        assert evtData0.evtPath != null : 
evtData0;
 
-                                    byte[] msgBytes = 
readCustomEventData(zkClient,
-                                        evtData0.evtPath,
-                                        evtData0.sndNodeId);
+                                        byte[] msgBytes = 
readCustomEventData(zkClient,
+                                            evtData0.evtPath,
+                                            evtData0.sndNodeId);
 
-                                    msg = unmarshalZip(msgBytes);
+                                        msg = unmarshalZip(msgBytes);
+                                    }
                                 }
+                                else
+                                    msg = evtData0.msg;
+
+                                evtData0.resolvedMsg = msg;
                             }
-                            else
-                                msg = evtData0.msg;
 
-                            evtData0.resolvedMsg = msg;
-                        }
+                            if (msg instanceof ZkInternalMessage)
+                                processInternalMessage(evtData0, 
(ZkInternalMessage)msg);
+                            else {
+                                notifyCustomEvent(evtData0, msg);
 
-                        if (msg instanceof ZkInternalMessage)
-                            processInternalMessage(evtData0, 
(ZkInternalMessage)msg);
-                        else {
-                            notifyCustomEvent(evtData0, msg);
+                                if (!evtData0.ackEvent())
+                                    updateNodeInfo = true;
+                            }
 
-                            if (!evtData0.ackEvent())
-                                updateNodeInfo = true;
+                            break;
                         }
 
-                        break;
+                        default:
+                            assert false : "Invalid event: " + evtData;
                     }
 
-                    default:
-                        assert false : "Invalid event: " + evtData;
+                    evtProcessed = true;
+                }
+
+                if (rtState.joined) {
+                    rtState.locNodeInfo.lastProcEvt = evtData.eventId();
+
+                    rtState.procEvtCnt++;
+
+                    if (rtState.procEvtCnt % evtsAckThreshold == 0)
+                        updateNodeInfo = true;
                 }
+            }
+        }
+        catch (KeeperException.NoNodeException e) {
+            // Can get NoNodeException if local node was forcible failed,
+            // in this case coordinator does not wait when this node process 
all events.
+            boolean exists;
 
-                evtProcessed = true;
+            try {
+                exists = rtState.zkClient.exists(rtState.locNodeZkPath);
             }
+            catch (Exception e0) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to check is local node is alive:" + e0);
 
-            if (rtState.joined) {
-                rtState.locNodeInfo.lastProcEvt = evtData.eventId();
+                exists = true;
+            }
 
-                rtState.procEvtCnt++;
+            if (!exists) {
+                U.warn(log, "Failed to process discovery event, local node was 
forced to stop.", e);
 
-                if (rtState.procEvtCnt % evtsAckThreshold == 0)
-                    updateNodeInfo = true;
+                throw localNodeFail("Local node was forced to stop.", true);
             }
+
+            throw e;
         }
 
         if (rtState.crd)
@@ -2819,38 +2871,26 @@ public class ZookeeperDiscoveryImpl {
         throws Exception {
         ClusterNode creatorNode = rtState.top.nodesById.get(evtData.sndNodeId);
 
+        ZookeeperClusterNode node = 
rtState.top.nodesByInternalId.get(msg.nodeInternalId);
+
+        assert node != null : msg.nodeInternalId;
+
         if (msg.warning != null) {
             U.warn(log, "Received force EVT_NODE_FAILED event with warning [" +
-                "nodeId=" + msg.nodeId +
+                "nodeId=" + node.id() +
                 ", msg=" + msg.warning +
                 ", nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : 
evtData.sndNodeId) + ']');
         }
         else {
             U.warn(log, "Received force EVT_NODE_FAILED event [" +
-                "nodeId=" + msg.nodeId +
+                "nodeId=" + node.id() +
                 ", nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : 
evtData.sndNodeId) + ']');
         }
 
-        ZookeeperClusterNode node = rtState.top.nodesById.get(msg.nodeId);
-
-        assert node != null : msg.nodeId;
-
         if (node.isLocal())
             throw localNodeFail("Received force EVT_NODE_FAILED event for 
local node.", true);
         else
             notifyNodeFail(node.internalId(), evtData.topologyVersion());
-
-        if (rtState.crd) {
-            ZookeeperClient zkClient = rtState.zkClient;
-
-            for (String child : zkClient.getChildren(zkPaths.aliveNodesDir)) {
-                if (ZkIgnitePaths.aliveInternalId(child) == node.internalId()) 
{
-                    zkClient.deleteIfExistsAsync(zkPaths.aliveNodesDir + "/" + 
child);
-
-                    break;
-                }
-            }
-        }
     }
 
     /**
@@ -3104,6 +3144,7 @@ public class ZookeeperDiscoveryImpl {
                     }
 
                     ZkCommunicationProblemContext ctx = new 
ZkCommunicationProblemContext(
+                        
((IgniteKernal)spi.ignite()).context().cache().context(),
                         topSnapshot,
                         initialNodes,
                         nodesRes);
@@ -3296,7 +3337,7 @@ public class ZookeeperDiscoveryImpl {
      * @param clientReconnect {@code True} if allow client reconnect.
      * @return Exception to be thrown.
      */
-    private Exception localNodeFail(String msg, boolean clientReconnect) {
+    private ZookeeperClientFailedException localNodeFail(String msg, boolean 
clientReconnect) {
         U.warn(log, msg);
 
 //        if (locNode.isClient() && rtState.zkClient.connected()) {
@@ -3611,13 +3652,6 @@ public class ZookeeperDiscoveryImpl {
     /**
      *
      */
-    public void onStop() {
-        // No-op.
-    }
-
-    /**
-     *
-     */
     public void stop() {
         stop0(new IgniteSpiException("Node stopped"));
     }
@@ -3696,8 +3730,6 @@ public class ZookeeperDiscoveryImpl {
         U.error(log, "Fatal error in ZookeeperDiscovery. " +
             "Stopping the node in order to prevent cluster wide instability.", 
err);
 
-        onStop();
-
         stop0(err);
 
         new Thread(new Runnable() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
index 9df561a..99006d1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
@@ -68,6 +68,8 @@ public class ClusterGroupSelfTest extends 
ClusterGroupAbstractTest {
                 if (i == 0)
                     ignite = g;
             }
+
+            waitForTopology(NODES_CNT);
         }
         finally {
             Ignition.setClientMode(false);

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java 
b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java
index 7e368cb..f71ffb0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java
@@ -45,6 +45,8 @@ public class GridSelfTest extends ClusterGroupAbstractTest {
 
         for (int i = 0; i < NODES_CNT; i++)
             startGrid(i);
+
+        waitForTopology(NODES_CNT);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java
index 1d70246..aa2abae 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java
@@ -91,12 +91,12 @@ public abstract class GridAffinityProcessorAbstractSelfTest 
extends GridCommonAb
     @Override protected void beforeTestsStarted() throws Exception {
         assert NODES_CNT >= 1;
 
-        withCache = false;
+        withCache = true;
 
         for (int i = 0; i < NODES_CNT; i++)
             startGrid(i);
 
-        withCache = true;
+        withCache = false;
 
         for (int i = NODES_CNT; i < 2 * NODES_CNT; i++)
             startGrid(i);

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java
index c1af323..8f03c4c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java
@@ -49,6 +49,9 @@ public class ClosureServiceClientsNodesTest extends 
GridCommonAbstractTest {
     /** Number of grids started for tests. */
     private static final int NODES_CNT = 4;
 
+    /** */
+    private static final int CLIENT_IDX = 1;
+
     /** Test singleton service name. */
     private static final String SINGLETON_NAME = "testSingleton";
 
@@ -61,11 +64,11 @@ public class ClosureServiceClientsNodesTest extends 
GridCommonAbstractTest {
 
         cfg.setMarshaller(new BinaryMarshaller());
 
-        cfg.setDiscoverySpi(new 
TcpDiscoverySpi().setIpFinder(ipFinder).setForceServerMode(true));
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
 
         cfg.setCacheConfiguration();
 
-        if (igniteInstanceName.equals(getTestIgniteInstanceName(0)))
+        if (igniteInstanceName.equals(getTestIgniteInstanceName(CLIENT_IDX)))
             cfg.setClientMode(true);
 
         return cfg;
@@ -88,8 +91,10 @@ public class ClosureServiceClientsNodesTest extends 
GridCommonAbstractTest {
     public void testDefaultClosure() throws Exception {
         Set<String> srvNames = new HashSet<>(NODES_CNT - 1);
 
-        for (int i = 1; i < NODES_CNT; ++i)
-            srvNames.add(getTestIgniteInstanceName(i));
+        for (int i = 0; i < NODES_CNT; ++i) {
+            if (i != CLIENT_IDX)
+                srvNames.add(getTestIgniteInstanceName(i));
+        }
 
         for (int i = 0 ; i < NODES_CNT; i++) {
             log.info("Iteration: " + i);
@@ -137,7 +142,7 @@ public class ClosureServiceClientsNodesTest extends 
GridCommonAbstractTest {
 
             assertEquals(1, res.size());
 
-            assertEquals(getTestIgniteInstanceName(0), F.first(res));
+            assertEquals(getTestIgniteInstanceName(CLIENT_IDX), F.first(res));
         }
     }
 
@@ -168,7 +173,7 @@ public class ClosureServiceClientsNodesTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testDefaultService() throws Exception {
-        UUID clientNodeId = grid(0).cluster().localNode().id();
+        UUID clientNodeId = grid(CLIENT_IDX).cluster().localNode().id();
 
         for (int i = 0 ; i < NODES_CNT; i++) {
             log.info("Iteration: " + i);
@@ -209,7 +214,7 @@ public class ClosureServiceClientsNodesTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testClientService() throws Exception {
-        UUID clientNodeId = grid(0).cluster().localNode().id();
+        UUID clientNodeId = grid(CLIENT_IDX).cluster().localNode().id();
 
         for (int i = 0 ; i < NODES_CNT; i++) {
             log.info("Iteration: " + i);

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
index af73535..e020bcc 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
@@ -2859,6 +2859,13 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testCommunicationErrorResolve_CachesInfo1() throws Exception {
+
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testConnectionCheck() throws Exception {
        final int NODES = 5;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8903ca/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoveryTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoveryTestSuite.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoveryTestSuite.java
new file mode 100644
index 0000000..4ea9b37
--- /dev/null
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoveryTestSuite.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk;
+
+import junit.framework.TestSuite;
+import 
org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest;
+
+/**
+ *
+ */
+public class ZookeeperDiscoveryTestSuite extends TestSuite {
+    /**
+     * @return Test suite.
+     * @throws Exception Thrown in case of the failure.
+     */
+    public static TestSuite suite() throws Exception {
+        // TODO ZK
+        TestSuite suite = new TestSuite("ZookeeperDiscoverySpi Test Suite");
+
+        // Regular tests executed with ZookeeperDiscoverySpi.
+        suite.addTestSuite(GridEventConsumeSelfTest.class);
+
+        return suite;
+    }
+}

Reply via email to