Repository: ignite
Updated Branches:
  refs/heads/ignite-zk af75c4273 -> 21a72646d


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/21a72646
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/21a72646
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/21a72646

Branch: refs/heads/ignite-zk
Commit: 21a72646de4c39bce1b0a459dc091490aec117b9
Parents: af75c42
Author: sboikov <[email protected]>
Authored: Mon Dec 11 11:42:39 2017 +0300
Committer: sboikov <[email protected]>
Committed: Mon Dec 11 12:52:54 2017 +0300

----------------------------------------------------------------------
 .../zk/internal/ZookeeperClusterNode.java       |   7 +
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 250 +++++++++++++++++--
 .../ZookeeperDiscoverySpiBasicTest.java         |  41 +++
 3 files changed, 271 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/21a72646/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
index 2bb244f..859c105 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
@@ -266,6 +266,13 @@ public class ZookeeperClusterNode implements 
IgniteClusterNode, Serializable, Co
         id = newId;
     }
 
+    /**
+     * @return Session timeout.
+     */
+    long sessionTimeout() {
+        return sesTimeout;
+    }
+
     /** {@inheritDoc} */
     @Override public IgniteProductVersion version() {
         return ver;

http://git-wip-us.apache.org/repos/asf/ignite/blob/21a72646/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index c41a14d8..5808c7c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -68,6 +68,7 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
+import org.jboss.netty.util.internal.ConcurrentHashMap;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
@@ -155,7 +156,12 @@ public class ZookeeperDiscoveryImpl {
     /** */
     public volatile IgniteDiscoverySpiInternalListener internalLsnr;
 
+    /** */
+    private final ConcurrentHashMap<UUID, PingFuture> pingFuts = new 
ConcurrentHashMap<>();
+
     /**
+     * @param spi Discovery SPI.
+     * @param igniteInstanceName Instance name.
      * @param log Logger.
      * @param zkRootPath Zookeeper base path node all nodes.
      * @param locNode Local node instance.
@@ -218,7 +224,7 @@ public class ZookeeperDiscoveryImpl {
      * @param nodeId Node ID.
      * @return Node instance.
      */
-    @Nullable public ClusterNode node(UUID nodeId) {
+    @Nullable public ZookeeperClusterNode node(UUID nodeId) {
         assert nodeId != null;
 
         return rtState.top.nodesById.get(nodeId);
@@ -229,11 +235,37 @@ public class ZookeeperDiscoveryImpl {
      * @return Ping result.
      */
     public boolean pingNode(UUID nodeId) {
-        // TODO ZK
-        if (connState == ConnectionState.DISCONNECTED)
-            throw new IgniteClientDisconnectedException(null, "Client is 
disconnected.");
+        ZookeeperClusterNode node = node(nodeId);
+
+        if (node == null)
+            return false;
+
+        if (node.isLocal())
+            return true;
+
+        PingFuture fut = pingFuts.get(nodeId);
+
+        if (fut == null) {
+            fut = new PingFuture(node);
+
+            PingFuture old = pingFuts.putIfAbsent(nodeId, fut);
+
+            if (old == null) {
+                if (fut.checkNodeAndState())
+                    spi.getSpiContext().addTimeoutObject(fut);
+                else
+                    assert fut.isDone();
+            }
+            else
+                fut = old;
+        }
 
-        return node(nodeId) != null;
+        try {
+            return fut.get();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteSpiException(e);
+        }
     }
 
     /**
@@ -588,6 +620,11 @@ public class ZookeeperDiscoveryImpl {
         }
     }
 
+    /**
+     * @param zkClient Client.
+     * @param basePath Base path.
+     * @param partCnt Parts count.
+     */
     private void deleteMultiplePartsAsync(ZookeeperClient zkClient, String 
basePath, int partCnt) {
         for (int i = 0; i < partCnt; i++) {
             String path = multipartPathName(basePath, i);
@@ -597,6 +634,13 @@ public class ZookeeperDiscoveryImpl {
 
     }
 
+    /**
+     * @param zkClient Client.
+     * @param basePath Base path.
+     * @param partCnt Parts count.
+     * @return Read parts.
+     * @throws Exception If failed.
+     */
     private byte[] readMultipleParts(ZookeeperClient zkClient, String 
basePath, int partCnt)
         throws Exception {
         assert partCnt >= 1;
@@ -632,6 +676,14 @@ public class ZookeeperDiscoveryImpl {
             return zkClient.getData(multipartPathName(basePath, 0));
     }
 
+    /**
+     * @param zkClient Client.
+     * @param basePath Base path.
+     * @param parts Data parts.
+     * @return Number of parts.
+     * @throws ZookeeperClientFailedException If client failed.
+     * @throws InterruptedException If interrupted.
+     */
     private int saveMultipleParts(ZookeeperClient zkClient, String basePath, 
List<byte[]> parts)
         throws ZookeeperClientFailedException, InterruptedException
     {
@@ -648,6 +700,11 @@ public class ZookeeperDiscoveryImpl {
         return parts.size();
     }
 
+    /**
+     * @param basePath Base path.
+     * @param part Part number.
+     * @return Path.
+     */
     private static String multipartPathName(String basePath, int part) {
         return basePath + String.format("%04d", part);
     }
@@ -879,7 +936,10 @@ public class ZookeeperDiscoveryImpl {
         }
     }
 
-    private void onPreviousNodeFail() throws Exception {
+    /**
+     *
+     */
+    private void onPreviousNodeFail() {
         // TODO ZK:
 //        if (locInternalId == crdInternalId + 1) {
 //            if (log.isInfoEnabled())
@@ -933,7 +993,7 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
-     * @param alivePath
+     * @param alivePath Node path.
      */
     private void watchAliveNodeData(String alivePath) {
         assert rtState.locNodeZkPath != null;
@@ -997,6 +1057,12 @@ public class ZookeeperDiscoveryImpl {
             handleProcessedEventsOnNodesFail(failedNodes);
     }
 
+    /**
+     * @param nodeId Node ID.
+     * @param prefixId Path prefix.
+     * @return Join data.
+     * @throws Exception If failed.
+     */
     private ZkJoiningNodeData unmarshalJoinData(UUID nodeId, UUID prefixId) 
throws Exception {
         String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId);
 
@@ -1019,9 +1085,10 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
-     * @param nodeId
-     * @param aliveNodePath
-     * @return
+     * @param nodeId Node ID.
+     * @param prefixId Path prefix.
+     * @param aliveNodePath Node path.
+     * @return Join data.
      * @throws Exception If failed.
      */
     private Object unmarshalJoinDataOnCoordinator(UUID nodeId, UUID prefixId, 
String aliveNodePath) throws Exception {
@@ -1072,6 +1139,7 @@ public class ZookeeperDiscoveryImpl {
      * @param internalId Joined node internal ID.
      * @param aliveNodePath Joined node path.
      * @throws Exception If failed.
+     * @return {@code True} if new join event was added.
      */
     private boolean processJoinOnCoordinator(
         TreeMap<Long, ZookeeperClusterNode> curTop,
@@ -1204,7 +1272,9 @@ public class ZookeeperDiscoveryImpl {
 
     /**
      * @param curTop Current nodes.
+     * @param joiningNodeData Join data.
      * @param internalId Joined node internal ID.
+     * @param prefixId Unique path prefix.
      * @throws Exception If failed.
      */
     private void generateNodeJoin(
@@ -1392,14 +1462,26 @@ public class ZookeeperDiscoveryImpl {
         rtState.zkClient.deleteAll(path, rtState.zkClient.getChildren(path), 
-1);
     }
 
+    /**
+     * @return Nodes.
+     */
     ZkClusterNodes nodes() {
         return rtState.top;
     }
 
+    /**
+     * @return Client.
+     */
     ZookeeperClient zkClient() {
         return rtState.zkClient;
     }
 
+    /**
+     * @param evtPath Event path.
+     * @param sndNodeId Sender node ID.
+     * @return Event data.
+     * @throws Exception If failed.
+     */
     private byte[] readCustomEventData(String evtPath, UUID sndNodeId) throws 
Exception {
         int partCnt = ZkIgnitePaths.customEventPartsCount(evtPath);
 
@@ -1504,13 +1586,13 @@ public class ZookeeperDiscoveryImpl {
                     catch (IgniteCheckedException e) {
                         U.error(log, "Failed to unmarshal custom discovery 
message: " + e, e);
 
-                        deleteCustomEventData(rtState.zkClient, evtPath);
+                        deleteCustomEventDataAsync(rtState.zkClient, evtPath);
                     }
                 }
                 else {
                     U.warn(log, "Ignore custom event from unknown node: " + 
sndNodeId);
 
-                    deleteCustomEventData(rtState.zkClient, evtPath);
+                    deleteCustomEventDataAsync(rtState.zkClient, evtPath);
                 }
 
                 rtState.evtsData.procCustEvt = evtE.getKey();
@@ -1520,7 +1602,11 @@ public class ZookeeperDiscoveryImpl {
         }
     }
 
-    private void deleteCustomEventData(ZookeeperClient zkClient, String 
evtPath) {
+    /**
+     * @param zkClient Client.
+     * @param evtPath Event path.
+     */
+    private void deleteCustomEventDataAsync(ZookeeperClient zkClient, String 
evtPath) {
         if (log.isDebugEnabled())
             log.debug("Delete custom event data: " + evtPath);
 
@@ -1774,12 +1860,13 @@ public class ZookeeperDiscoveryImpl {
 
         rtState.joined = true;
 
-        deleteDataForJoined(evtData);
+        deleteDataForJoinedAsync(evtData);
     }
 
     /**
-     * @param evtData
-     * @param msg
+     * @param evtData Event daa.
+     * @param msg Message.
+     * @throws Exception If failed.
      */
     private void processInternalMessage(ZkDiscoveryCustomEventData evtData, 
ZkInternalMessage msg) throws Exception {
         if (msg instanceof ZkInternalForceNodeFailMessage) {
@@ -1918,6 +2005,11 @@ public class ZookeeperDiscoveryImpl {
             throw new ZookeeperClientFailedException("Received node failed 
event for local node.");
         }
         else {
+            PingFuture pingFut = pingFuts.get(failedNode.id());
+
+            if (pingFut != null)
+                pingFut.onDone(false);
+
             final List<ClusterNode> topSnapshot = 
rtState.top.topologySnapshot();
 
             lsnr.onDiscovery(EVT_NODE_FAILED,
@@ -1957,7 +2049,7 @@ public class ZookeeperDiscoveryImpl {
 
                 switch (evtData.eventType()) {
                     case EventType.EVT_NODE_JOINED: {
-                        
handleProcessedJoinEvent((ZkDiscoveryNodeJoinEventData)evtData);
+                        
handleProcessedJoinEventAsync((ZkDiscoveryNodeJoinEventData)evtData);
 
                         break;
                     }
@@ -2058,18 +2150,22 @@ public class ZookeeperDiscoveryImpl {
 
     /**
      * @param evtData Event data.
-     * @throws Exception If failed.
      */
-    private void handleProcessedJoinEvent(ZkDiscoveryNodeJoinEventData 
evtData) throws Exception {
+    private void handleProcessedJoinEventAsync(ZkDiscoveryNodeJoinEventData 
evtData) {
         if (log.isDebugEnabled())
             log.debug("All nodes processed node join [evtData=" + evtData + 
']');
 
         deleteJoiningNodeData(evtData.nodeId, evtData.joinDataPrefixId, 
evtData.joinDataPartCnt);
 
-        deleteDataForJoined(evtData);
+        deleteDataForJoinedAsync(evtData);
     }
 
-    private void deleteJoiningNodeData(UUID nodeId, UUID joinDataPrefixId, int 
partCnt) throws Exception {
+    /**
+     * @param nodeId Node ID.
+     * @param joinDataPrefixId Path prefix.
+     * @param partCnt Parts count.
+     */
+    private void deleteJoiningNodeData(UUID nodeId, UUID joinDataPrefixId, int 
partCnt) {
         String evtDataPath = zkPaths.joiningNodeDataPath(nodeId, 
joinDataPrefixId);
 
         if (log.isDebugEnabled())
@@ -2081,7 +2177,10 @@ public class ZookeeperDiscoveryImpl {
             deleteMultiplePartsAsync(rtState.zkClient, evtDataPath + ":", 
partCnt);
     }
 
-    private void deleteDataForJoined(ZkDiscoveryNodeJoinEventData evtData) {
+    /**
+     * @param evtData Event data.
+     */
+    private void deleteDataForJoinedAsync(ZkDiscoveryNodeJoinEventData 
evtData) {
         String dataForJoinedPath = 
zkPaths.joinEventDataPathForJoined(evtData.eventId());
 
         if (log.isDebugEnabled())
@@ -2091,18 +2190,16 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
+     * @param ctx Context for log.
      * @param evtData Event data.
-     * @throws Exception If failed.
      * @return Ack message.
      */
-    @Nullable private DiscoverySpiCustomMessage 
handleProcessedCustomEvent(String ctx, ZkDiscoveryCustomEventData evtData)
-        throws Exception
-    {
+    @Nullable private DiscoverySpiCustomMessage 
handleProcessedCustomEvent(String ctx, ZkDiscoveryCustomEventData evtData) {
         if (log.isDebugEnabled())
             log.debug("All nodes processed custom event [ctx=" + ctx + ", 
evtData=" + evtData + ']');
 
         if (!evtData.ackEvent()) {
-            deleteCustomEventData(rtState.zkClient, evtData.evtPath);
+            deleteCustomEventDataAsync(rtState.zkClient, evtData.evtPath);
 
             assert evtData.msg != null || locNode.order() > 
evtData.topologyVersion() : evtData;
 
@@ -2526,6 +2623,105 @@ public class ZookeeperDiscoveryImpl {
     /**
      *
      */
+    private class PingFuture extends GridFutureAdapter<Boolean> implements 
IgniteSpiTimeoutObject, Runnable {
+        /** */
+        private final ZookeeperClusterNode node;
+
+        /** */
+        private final long endTime;
+
+        /** */
+        private final IgniteUuid id;
+
+        /**
+         * @param node Node.
+         */
+        PingFuture(ZookeeperClusterNode node) {
+            this.node = node;
+
+            id = IgniteUuid.fromUuid(node.id());
+
+            endTime = System.currentTimeMillis() + node.sessionTimeout() + 
1000;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteUuid id() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long endTime() {
+            return endTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            if (checkNodeAndState()) {
+                try {
+                    for (String path : 
zkClient().getChildren(zkPaths.aliveNodesDir)) {
+                        if (node.internalId() == 
ZkIgnitePaths.aliveInternalId(path)) {
+                            onDone(true);
+
+                            return;
+                        }
+                    }
+
+                    onDone(false);
+                }
+                catch (Exception e) {
+                    if (checkNodeAndState())
+                        onDone(e);
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
+            if (checkNodeAndState())
+                runInWorkerThread(this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onDone(@Nullable Boolean res, @Nullable 
Throwable err) {
+            if (super.onDone(res, err)) {
+                pingFuts.remove(node.id(), this);
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /**
+         * @return {@code False} if future was completed.
+         */
+        boolean checkNodeAndState() {
+            ConnectionState connState = ZookeeperDiscoveryImpl.this.connState;
+
+            if (connState == ConnectionState.DISCONNECTED) {
+                onDone(new IgniteClientDisconnectedException(null, "Client is 
disconnected."));
+
+                return false;
+            }
+            else if (connState == ConnectionState.STOPPED) {
+                onDone(new IgniteException("Node stopped."));
+
+                return false;
+            }
+
+            if (node(node.id()) == null) {
+                onDone(false);
+
+                return false;
+            }
+
+            return true;
+        }
+    }
+
+    /**
+     *
+     */
     enum ConnectionState {
         /** */
         STARTED,

http://git-wip-us.apache.org/repos/asf/ignite/blob/21a72646/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 99f5089..8498c7c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -1568,6 +1568,47 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testPing() throws Exception {
+        sesTimeout = 5000;
+
+        startGrids(3);
+
+        final ZookeeperDiscoverySpi spi = 
waitSpi(getTestIgniteInstanceName(1));
+
+        final UUID nodeId = ignite(2).cluster().localNode().id();
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new 
Runnable() {
+            @Override public void run() {
+                assertTrue(spi.pingNode(nodeId));
+            }
+        }, 32, "ping");
+
+        fut.get();
+
+        fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+            @Override public void run() {
+                spi.pingNode(nodeId);
+            }
+        }, 32, "ping");
+
+        U.sleep(100);
+
+        stopGrid(2);
+
+        fut.get();
+
+        fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+            @Override public void run() {
+                assertFalse(spi.pingNode(nodeId));
+            }
+        }, 32, "ping");
+
+        fut.get();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testWithPersistence1() throws Exception {
         startWithPersistence(false);
     }

Reply via email to