Repository: ignite
Updated Branches:
  refs/heads/ignite-zk ab47f191b -> 942c70f16


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/942c70f1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/942c70f1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/942c70f1

Branch: refs/heads/ignite-zk
Commit: 942c70f162e9a4e5010932ccb454f35186f9b4b9
Parents: ab47f19
Author: sboikov <[email protected]>
Authored: Fri Dec 1 13:04:13 2017 +0300
Committer: sboikov <[email protected]>
Committed: Fri Dec 1 15:58:35 2017 +0300

----------------------------------------------------------------------
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java |   3 +-
 .../zk/internal/ZkDiscoveryEventsData.java      |   3 +-
 .../discovery/zk/internal/ZkIgnitePaths.java    |  14 --
 .../zk/internal/ZkInternalFailNodeMessage.java  |  52 +++++
 .../zk/internal/ZkInternalMessage.java          |  27 +++
 .../zk/internal/ZkJoiningNodeData.java          |   9 +
 .../discovery/zk/internal/ZookeeperClient.java  |   2 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 174 +++++++++++++--
 .../internal/IgniteClientReconnectStopTest.java |   3 +
 .../internal/util/GridTestClockTimer.java       |   3 +-
 .../ZookeeperDiscoverySpiBasicTest.java         | 214 ++++++++++++++++++-
 .../testframework/junits/GridAbstractTest.java  |   4 +-
 12 files changed, 458 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/942c70f1/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index 6029e83..08f0b26 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -257,8 +257,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
 
     /** {@inheritDoc} */
     @Override public void failNode(UUID nodeId, @Nullable String warning) {
-        // TODO ZK
-        throw new UnsupportedOperationException();
+        impl.failNode(nodeId, warning);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/942c70f1/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
index b29d85e..37dc7df 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
@@ -59,8 +59,7 @@ class ZkDiscoveryEventsData implements Serializable {
      * @param nodes Current nodes in topology (these nodes should ack that 
event processed).
      * @param evt Event.
      */
-    void addEvent(Collection<ZookeeperClusterNode> nodes, ZkDiscoveryEventData 
evt)
-    {
+    void addEvent(Collection<ZookeeperClusterNode> nodes, ZkDiscoveryEventData 
evt) {
         Object old = evts.put(evt.eventId(), evt);
 
         assert old == null : old;

http://git-wip-us.apache.org/repos/asf/ignite/blob/942c70f1/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index 0e427b9..2478979 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -75,20 +75,6 @@ class ZkIgnitePaths {
     /**
      * TODO ZK: copied from curator.
      *
-     * validate the provided znode path string
-     * @param path znode path string
-     * @param isSequential if the path is being created
-     * with a sequential flag
-     * @throws IllegalArgumentException if the path is invalid
-     */
-    public static void validatePath(String path, boolean isSequential)
-        throws IllegalArgumentException {
-        validatePath(isSequential? path + "1": path);
-    }
-
-    /**
-     * TODO ZK: copied from curator.
-     *
      * Validate the provided znode path string
      * @param path znode path string
      * @return The given path if it was valid, for fluent chaining

http://git-wip-us.apache.org/repos/asf/ignite/blob/942c70f1/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalFailNodeMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalFailNodeMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalFailNodeMessage.java
new file mode 100644
index 0000000..b289af1
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalFailNodeMessage.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.UUID;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class ZkInternalFailNodeMessage implements ZkInternalMessage {
+    /** */
+    final UUID nodeId;
+
+    /** */
+    final String warning;
+
+    /**
+     * @param nodeId Node ID.
+     * @param warning Warning to be displayed on all nodes.
+     */
+    ZkInternalFailNodeMessage(UUID nodeId, String warning) {
+        this.nodeId = nodeId;
+        this.warning = warning;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/942c70f1/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
new file mode 100644
index 0000000..e56bab0
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+
+/**
+ *
+ */
+interface ZkInternalMessage extends DiscoverySpiCustomMessage {
+    // No-op.
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/942c70f1/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
index 1947b6b..6733ab6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
@@ -19,6 +19,8 @@ package org.apache.ignite.spi.discovery.zk.internal;
 
 import java.io.Serializable;
 import java.util.Map;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
  *
@@ -28,9 +30,11 @@ class ZkJoiningNodeData implements Serializable {
     private static final long serialVersionUID = 0L;
 
     /** */
+    @GridToStringInclude
     private final ZookeeperClusterNode node;
 
     /** */
+    @GridToStringInclude
     private final Map<Integer, Serializable> discoData;
 
     /**
@@ -58,4 +62,9 @@ class ZkJoiningNodeData implements Serializable {
     Map<Integer, Serializable> discoveryData() {
         return discoData;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ZkJoiningNodeData.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/942c70f1/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index 2ccc7ea..fa5b807 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -218,7 +218,7 @@ public class ZookeeperClient implements Watcher {
      *
      */
     private void notifyConnectionLost() {
-        if (state == ConnectionState.Lost && connLostC != null)
+        if (!closing && state == ConnectionState.Lost && connLostC != null)
             connLostC.run();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/942c70f1/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 88905b8..1d398ed 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
@@ -63,6 +64,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
 import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL;
 import static org.apache.zookeeper.CreateMode.PERSISTENT;
 
@@ -218,6 +220,29 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
+     * @param nodeId Node ID.
+     * @param warning Warning.
+     */
+    public void failNode(UUID nodeId, @Nullable String warning) {
+        ZookeeperClusterNode node = state.top.nodesById.get(nodeId);
+
+        if (node == null) {
+            if (log.isDebugEnabled())
+                log.debug("Ignore forcible node fail request, node does not 
exist: " + nodeId);
+
+            return;
+        }
+
+        if (!node.isClient()) {
+            U.warn(log, "Ignore forcible node fail request for non-client 
node: " + node);
+
+            return;
+        }
+
+        sendCustomMessage(new ZkInternalFailNodeMessage(nodeId, warning));
+    }
+
+    /**
      *
      */
     public void reconnect() {
@@ -296,17 +321,21 @@ public class ZookeeperDiscoveryImpl {
      */
     class SegmentedWatcher implements AsyncCallback.VoidCallback {
         @Override public void processResult(int rc, String path, Object ctx) {
-            assert state.evtsData != null;
-
-            lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED,
-                state.evtsData.topVer,
-                locNode,
-                state.top.topologySnapshot(),
-                Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                null);
+            notifySegmented();
         }
     }
 
+    private void notifySegmented() {
+        assert state.evtsData != null;
+
+        lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED,
+            state.evtsData.topVer,
+            locNode,
+            state.top.topologySnapshot(),
+            Collections.<Long, Collection<ClusterNode>>emptyMap(),
+            null);
+    }
+
     /**
      * @return Remote nodes.
      */
@@ -995,11 +1024,16 @@ public class ZookeeperDiscoveryImpl {
         }
 
         if (newEvts != null) {
+            Set<UUID> alives = null;
+
             for (Map.Entry<Integer, String> evtE : newEvts.entrySet()) {
                 UUID sndNodeId = 
ZkIgnitePaths.customEventSendNodeId(evtE.getValue());
 
                 ZookeeperClusterNode sndNode = 
state.top.nodesById.get(sndNodeId);
 
+                if (alives != null && !alives.contains(sndNode.id()))
+                    sndNode = null;
+
                 String evtDataPath = zkPaths.customEvtsDir + "/" + 
evtE.getValue();
 
                 if (sndNode != null) {
@@ -1012,6 +1046,35 @@ public class ZookeeperDiscoveryImpl {
 
                         state.evtsData.evtIdGen++;
 
+                        if (msg instanceof ZkInternalFailNodeMessage) {
+                            ZkInternalFailNodeMessage msg0 = 
(ZkInternalFailNodeMessage)msg;
+
+                            if (alives == null)
+                                alives = new 
HashSet<>(state.top.nodesById.keySet());
+
+                            if (alives.contains(msg0.nodeId)) {
+                                state.evtsData.topVer++;
+
+                                alives.remove(msg0.nodeId);
+
+                                ZookeeperClusterNode node = 
state.top.nodesById.get(msg0.nodeId);
+
+                                assert node != null :  msg0.nodeId;
+
+                                for (String child : 
zkClient().getChildren(zkPaths.aliveNodesDir)) {
+                                    if (ZkIgnitePaths.aliveInternalId(child) 
== node.internalId()) {
+                                        
zkClient().deleteIfExistsAsync(zkPaths.aliveNodesDir + "/" + child);
+
+                                        break;
+                                    }
+                                }
+                            }
+                            else {
+                                if (log.isDebugEnabled())
+                                    log.debug("Ignore forcible node fail 
request for unknown node: " + msg0.nodeId);
+                            }
+                        }
+
                         ZkDiscoveryCustomEventData evtData = new 
ZkDiscoveryCustomEventData(
                             state.evtsData.evtIdGen,
                             state.evtsData.topVer,
@@ -1164,10 +1227,14 @@ public class ZookeeperDiscoveryImpl {
                             evtData0.msg = msg;
                         }
 
-                        notifyCustomEvent(evtData0, msg);
+                        if (msg instanceof ZkInternalMessage)
+                            processInternalMessage(evtData0, 
(ZkInternalMessage)msg);
+                        else {
+                            notifyCustomEvent(evtData0, msg);
 
-                        if (!evtData0.ackEvent())
-                            updateNodeInfo = true;
+                            if (!evtData0.ackEvent())
+                                updateNodeInfo = true;
+                        }
 
                         break;
                     }
@@ -1272,6 +1339,36 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
+     * @param evtData
+     * @param msg
+     */
+    private void processInternalMessage(ZkDiscoveryCustomEventData evtData, 
ZkInternalMessage msg) throws Exception {
+        if (msg instanceof ZkInternalFailNodeMessage) {
+            ZkInternalFailNodeMessage msg0 = (ZkInternalFailNodeMessage)msg;
+
+            ClusterNode creatorNode = 
state.top.nodesById.get(evtData.sndNodeId);
+
+            if (msg0.warning != null) {
+                U.warn(log, "Received EVT_NODE_FAILED event with warning [" +
+                    "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : 
evtData.sndNodeId) +
+                    ", nodeId=" + msg0.nodeId +
+                    ", msg=" + msg0.warning + ']');
+            }
+            else {
+                U.warn(log, "Received force EVT_NODE_FAILED event [" +
+                    "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : 
evtData.sndNodeId) +
+                    ", nodeId=" + msg0.nodeId + ']');
+            }
+
+            ZookeeperClusterNode node = state.top.nodesById.get(msg0.nodeId);
+
+            assert node != null : msg0.nodeId;
+
+            processNodeFail(node.internalId(), evtData.topologyVersion());
+        }
+    }
+
+    /**
      * @param evtData Event data.
      * @param msg Custom message.
      */
@@ -1322,20 +1419,55 @@ public class ZookeeperDiscoveryImpl {
     /**
      * @param evtData Event data.
      */
-    @SuppressWarnings("unchecked")
-    private void notifyNodeFail(final ZkDiscoveryNodeFailEventData evtData) {
-        final ZookeeperClusterNode failedNode = 
state.top.removeNode(evtData.failedNodeInternalId());
+    private void notifyNodeFail(final ZkDiscoveryNodeFailEventData evtData) 
throws Exception {
+        processNodeFail(evtData.failedNodeInternalId(), 
evtData.topologyVersion());
+    }
+
+    /**
+     * @param nodeInternalId
+     * @param topVer
+     * @throws Exception
+     */
+    private void processNodeFail(int nodeInternalId, long topVer) throws 
Exception {
+        final ZookeeperClusterNode failedNode = 
state.top.removeNode(nodeInternalId);
 
         assert failedNode != null;
 
-        final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
+        if (failedNode.isLocal()) {
+            U.warn(log, "Received EVT_NODE_FAILED for local node.");
 
-        lsnr.onDiscovery(evtData.eventType(),
-            evtData.topologyVersion(),
-            failedNode,
-            topSnapshot,
-            Collections.<Long, Collection<ClusterNode>>emptyMap(),
-            null);
+            zkClient().onCloseStart();
+
+            if (locNode.isClient() && clientReconnectEnabled) {
+                boolean reconnect = false;
+
+                synchronized (stateMux) {
+                    if (connState == ConnectionState.STARTED) {
+                        reconnect = true;
+
+                        connState = ConnectionState.DISCONNECTED;
+                    }
+                }
+
+                if (reconnect)
+                    new ReconnectorThread().start();
+            }
+            else
+                notifySegmented();
+
+            // Stop any further processing.
+            throw new ZookeeperClientFailedException("Received node failed 
event for local node.");
+        }
+        else {
+            final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
+
+            lsnr.onDiscovery(EVT_NODE_FAILED,
+                topVer,
+                failedNode,
+                topSnapshot,
+                Collections.<Long, Collection<ClusterNode>>emptyMap(),
+                null);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/942c70f1/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStopTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStopTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStopTest.java
index e863cdf..98588b6 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStopTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStopTest.java
@@ -42,6 +42,9 @@ public class IgniteClientReconnectStopTest extends 
IgniteClientReconnectAbstract
      * @throws Exception If failed.
      */
     public void testStopWhenDisconnected() throws Exception {
+        if (!tcpDiscovery())
+            return;
+
         clientMode = true;
 
         Ignite client = startGrid(serverCount());

http://git-wip-us.apache.org/repos/asf/ignite/blob/942c70f1/modules/core/src/test/java/org/apache/ignite/internal/util/GridTestClockTimer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/GridTestClockTimer.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/GridTestClockTimer.java
index 5da9042..7a28ad1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/GridTestClockTimer.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/GridTestClockTimer.java
@@ -26,7 +26,8 @@ public class GridTestClockTimer implements Runnable {
      */
     public GridTestClockTimer() {
         synchronized (IgniteUtils.mux) {
-            assert IgniteUtils.gridCnt == 0 : IgniteUtils.gridCnt;
+            // TODO ZK
+            // assert IgniteUtils.gridCnt == 0 : IgniteUtils.gridCnt;
 
             IgniteUtils.gridCnt++; // To prevent one more timer thread start 
from IgniteUtils.onGridStart.
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/942c70f1/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index d50e9b9..8eaff07 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -29,6 +29,7 @@ import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -719,6 +720,69 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testConcurrentStartStop1() throws Exception {
+       concurrentStartStop(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentStartStop2() throws Exception {
+        concurrentStartStop(5);
+    }
+
+    /**
+     * @param initNodes Number of initially started nnodes.
+     * @throws Exception If failed.
+     */
+    private void concurrentStartStop(final int initNodes) throws Exception {
+        startGrids(initNodes);
+
+        final int NODES = 5;
+
+        long topVer = initNodes;
+
+        for (int i = 0; i < 10; i++) {
+            info("Iteration: " + i);
+
+            DiscoveryEvent[] expEvts = new DiscoveryEvent[NODES];
+
+            startGridsMultiThreaded(initNodes, NODES);
+
+            for (int j = 0; j < NODES; j++)
+                expEvts[j] = joinEvent(++topVer);
+
+            checkEvents(ignite(0), expEvts);
+
+            checkEventsConsistency();
+
+            final CyclicBarrier b = new CyclicBarrier(NODES);
+
+            GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+                @Override public void apply(Integer idx) {
+                    try {
+                        b.await();
+
+                        stopGrid(initNodes + idx);
+                    }
+                    catch (Exception e) {
+                        e.printStackTrace();
+
+                        fail();
+                    }
+                }
+            }, NODES, "stop-node");
+
+            for (int j = 0; j < NODES; j++)
+                expEvts[j] = failEvent(++topVer);
+
+            checkEventsConsistency();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testClusterRestart() throws Exception {
         startGridsMultiThreaded(3, false);
 
@@ -1148,7 +1212,22 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testClientReconnectSessionExpire1() throws Exception {
+    public void testClientReconnectSessionExpire1_1() throws Exception {
+       clientReconnectSessionExpire(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientReconnectSessionExpire1_2() throws Exception {
+        clientReconnectSessionExpire(true);
+    }
+
+    /**
+     * @param closeSock Test mode flag.
+     * @throws Exception If failed.
+     */
+    private void clientReconnectSessionExpire(boolean closeSock) throws 
Exception {
         startGrid(0);
 
         sesTimeout = 2000;
@@ -1159,7 +1238,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
 
         client.cache(DEFAULT_CACHE_NAME).put(1, 1);
 
-        reconnectClientNodes(log, Collections.singletonList(client), null, 
true);
+        reconnectClientNodes(log, Collections.singletonList(client), null, 
closeSock);
 
         assertEquals(1, client.cache(DEFAULT_CACHE_NAME).get(1));
 
@@ -1188,6 +1267,96 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testForceClientReconnect() throws Exception {
+        final int SRVS = 3;
+
+        startGrids(SRVS);
+
+        client = true;
+
+        startGrid(SRVS);
+
+        reconnectClientNodes(Collections.singletonList(ignite(SRVS)), new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ZookeeperDiscoverySpi spi = 
waitSpi(getTestIgniteInstanceName(SRVS));
+
+                spi.reconnect();
+
+                return null;
+            }
+        });
+
+        waitForTopology(SRVS + 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testForcibleClientFail() throws Exception {
+        final int SRVS = 3;
+
+        startGrids(SRVS);
+
+        client = true;
+
+        startGrid(SRVS);
+
+        reconnectClientNodes(Collections.singletonList(ignite(SRVS)), new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ZookeeperDiscoverySpi spi = 
waitSpi(getTestIgniteInstanceName(0));
+
+                spi.failNode(ignite(SRVS).cluster().localNode().id(), "Test 
forcible node fail");
+
+                return null;
+            }
+        });
+
+        waitForTopology(SRVS + 1);
+    }
+
+    /**
+     * @param clients Clients.
+     * @param c Closure to run.
+     * @throws Exception If failed.
+     */
+    private void reconnectClientNodes(List<Ignite> clients, Callable<Void> c)
+        throws Exception {
+        final CountDownLatch disconnectLatch = new 
CountDownLatch(clients.size());
+        final CountDownLatch reconnectLatch = new 
CountDownLatch(clients.size());
+
+        IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    log.info("Disconnected: " + evt);
+
+                    disconnectLatch.countDown();
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    log.info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        };
+
+        for (Ignite client : clients)
+            client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, 
EVT_CLIENT_NODE_RECONNECTED);
+
+        c.call();
+
+        waitReconnectEvent(log, disconnectLatch);
+
+        waitReconnectEvent(log, reconnectLatch);
+
+        for (Ignite client : clients)
+            client.events().stopLocalListen(p);
+    }
+
+    /**
      * @param restartZk If {@code true} in background restarts on of ZK 
servers.
      * @param closeClientSock If {@code true} in background closes zk clients' 
sockets.
      * @throws Exception If failed.
@@ -1506,7 +1675,9 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
                             return false;
                         }
 
-                        assertEquals(expEvt.type(), evt0.type());
+                        assertEquals("Unexpected event [topVer=" + 
expEvt.topologyVersion() +
+                            ", exp=" + U.gridEventName(expEvt.type()) +
+                            ", evt=" + evt0 + ']', expEvt.type(), evt0.type());
                     }
                 }
 
@@ -1527,7 +1698,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
      * @param spi Spi instance.
      */
     private static void closeZkClient(ZookeeperDiscoverySpi spi) {
-        ZooKeeper zk = GridTestUtils.getFieldValue(spi, "impl", "state", 
"zkClient", "zk");
+        ZooKeeper zk = zkClient(spi);
 
         try {
             zk.close();
@@ -1538,6 +1709,13 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @param spi Spi instance.
+     */
+    private static ZooKeeper zkClient(ZookeeperDiscoverySpi spi) {
+        return GridTestUtils.getFieldValue(spi, "impl", "state", "zkClient", 
"zk");
+    }
+
+    /**
      * @param expSize Expected nodes number.
      * @throws Exception If failed.
      */
@@ -1566,6 +1744,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
             }
         }, 15_000));
     }
+
     /**
      * Reconnect client node.
      *
@@ -1619,8 +1798,31 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
             }
         }
         else {
-            for (Ignite client : clients)
-                closeZkClient(client);
+            /*
+             * Use hack to simulate session expire without waiting session 
timeout:
+             * create and close ZooKeeper with the same session ID as ignite 
node's ZooKeeper.
+             */
+            List<ZooKeeper> dummyClients = new ArrayList<>();
+
+            for (Ignite client : clients) {
+                ZookeeperDiscoverySpi spi = 
(ZookeeperDiscoverySpi)client.configuration().getDiscoverySpi();
+
+                ZooKeeper zk = zkClient(spi);
+
+                ZooKeeper dummyZk = new ZooKeeper(
+                    spi.getZkConnectionString(),
+                    10_000,
+                    null,
+                    zk.getSessionId(),
+                    zk.getSessionPasswd());
+
+                dummyZk.exists("/a", false);
+
+                dummyClients.add(dummyZk);
+            }
+
+            for (ZooKeeper zk : dummyClients)
+                zk.close();
         }
 
         waitNoAliveZkNodes(log,

http://git-wip-us.apache.org/repos/asf/ignite/blob/942c70f1/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 149ed54..bff099d 100755
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -112,7 +112,6 @@ import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 import org.apache.log4j.Priority;
 import org.apache.log4j.RollingFileAppender;
-import org.apache.zookeeper.ZkTestClientCnxnSocketNIO;
 import org.jetbrains.annotations.Nullable;
 import org.springframework.beans.BeansException;
 import org.springframework.context.ApplicationContext;
@@ -123,7 +122,6 @@ import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_DISCO_FAILED_CLIEN
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static 
org.apache.ignite.testframework.config.GridTestProperties.BINARY_MARSHALLER_USE_SIMPLE_NAME_MAPPER;
-import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET;
 
 /**
  * Common abstract test for Ignite tests.
@@ -2224,7 +2222,7 @@ public abstract class GridAbstractTest extends TestCase {
         assertFalse("There are no nodes", nodes.isEmpty());
 
         if 
(nodes.get(0).configuration().getDiscoverySpi().getClass().getName().equals(ZK_DISCOVERY))
-            ZookeeperDiscoverySpiBasicTest.reconnectClientNodes(log, clients, 
null, true);
+            ZookeeperDiscoverySpiBasicTest.reconnectClientNodes(log, clients, 
null, false);
         else
             fail("Reconnect is not supported");
     }

Reply via email to