Repository: ignite Updated Branches: refs/heads/ignite-zk af75c4273 -> 21a72646d
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/21a72646 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/21a72646 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/21a72646 Branch: refs/heads/ignite-zk Commit: 21a72646de4c39bce1b0a459dc091490aec117b9 Parents: af75c42 Author: sboikov <[email protected]> Authored: Mon Dec 11 11:42:39 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Dec 11 12:52:54 2017 +0300 ---------------------------------------------------------------------- .../zk/internal/ZookeeperClusterNode.java | 7 + .../zk/internal/ZookeeperDiscoveryImpl.java | 250 +++++++++++++++++-- .../ZookeeperDiscoverySpiBasicTest.java | 41 +++ 3 files changed, 271 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/21a72646/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java index 2bb244f..859c105 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java @@ -266,6 +266,13 @@ public class ZookeeperClusterNode implements IgniteClusterNode, Serializable, Co id = newId; } + /** + * @return Session timeout. + */ + long sessionTimeout() { + return sesTimeout; + } + /** {@inheritDoc} */ @Override public IgniteProductVersion version() { return ver; http://git-wip-us.apache.org/repos/asf/ignite/blob/21a72646/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index c41a14d8..5808c7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -68,6 +68,7 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; +import org.jboss.netty.util.internal.ConcurrentHashMap; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; @@ -155,7 +156,12 @@ public class ZookeeperDiscoveryImpl { /** */ public volatile IgniteDiscoverySpiInternalListener internalLsnr; + /** */ + private final ConcurrentHashMap<UUID, PingFuture> pingFuts = new ConcurrentHashMap<>(); + /** + * @param spi Discovery SPI. + * @param igniteInstanceName Instance name. * @param log Logger. * @param zkRootPath Zookeeper base path node all nodes. * @param locNode Local node instance. @@ -218,7 +224,7 @@ public class ZookeeperDiscoveryImpl { * @param nodeId Node ID. * @return Node instance. */ - @Nullable public ClusterNode node(UUID nodeId) { + @Nullable public ZookeeperClusterNode node(UUID nodeId) { assert nodeId != null; return rtState.top.nodesById.get(nodeId); @@ -229,11 +235,37 @@ public class ZookeeperDiscoveryImpl { * @return Ping result. */ public boolean pingNode(UUID nodeId) { - // TODO ZK - if (connState == ConnectionState.DISCONNECTED) - throw new IgniteClientDisconnectedException(null, "Client is disconnected."); + ZookeeperClusterNode node = node(nodeId); + + if (node == null) + return false; + + if (node.isLocal()) + return true; + + PingFuture fut = pingFuts.get(nodeId); + + if (fut == null) { + fut = new PingFuture(node); + + PingFuture old = pingFuts.putIfAbsent(nodeId, fut); + + if (old == null) { + if (fut.checkNodeAndState()) + spi.getSpiContext().addTimeoutObject(fut); + else + assert fut.isDone(); + } + else + fut = old; + } - return node(nodeId) != null; + try { + return fut.get(); + } + catch (IgniteCheckedException e) { + throw new IgniteSpiException(e); + } } /** @@ -588,6 +620,11 @@ public class ZookeeperDiscoveryImpl { } } + /** + * @param zkClient Client. + * @param basePath Base path. + * @param partCnt Parts count. + */ private void deleteMultiplePartsAsync(ZookeeperClient zkClient, String basePath, int partCnt) { for (int i = 0; i < partCnt; i++) { String path = multipartPathName(basePath, i); @@ -597,6 +634,13 @@ public class ZookeeperDiscoveryImpl { } + /** + * @param zkClient Client. + * @param basePath Base path. + * @param partCnt Parts count. + * @return Read parts. + * @throws Exception If failed. + */ private byte[] readMultipleParts(ZookeeperClient zkClient, String basePath, int partCnt) throws Exception { assert partCnt >= 1; @@ -632,6 +676,14 @@ public class ZookeeperDiscoveryImpl { return zkClient.getData(multipartPathName(basePath, 0)); } + /** + * @param zkClient Client. + * @param basePath Base path. + * @param parts Data parts. + * @return Number of parts. + * @throws ZookeeperClientFailedException If client failed. + * @throws InterruptedException If interrupted. + */ private int saveMultipleParts(ZookeeperClient zkClient, String basePath, List<byte[]> parts) throws ZookeeperClientFailedException, InterruptedException { @@ -648,6 +700,11 @@ public class ZookeeperDiscoveryImpl { return parts.size(); } + /** + * @param basePath Base path. + * @param part Part number. + * @return Path. + */ private static String multipartPathName(String basePath, int part) { return basePath + String.format("%04d", part); } @@ -879,7 +936,10 @@ public class ZookeeperDiscoveryImpl { } } - private void onPreviousNodeFail() throws Exception { + /** + * + */ + private void onPreviousNodeFail() { // TODO ZK: // if (locInternalId == crdInternalId + 1) { // if (log.isInfoEnabled()) @@ -933,7 +993,7 @@ public class ZookeeperDiscoveryImpl { } /** - * @param alivePath + * @param alivePath Node path. */ private void watchAliveNodeData(String alivePath) { assert rtState.locNodeZkPath != null; @@ -997,6 +1057,12 @@ public class ZookeeperDiscoveryImpl { handleProcessedEventsOnNodesFail(failedNodes); } + /** + * @param nodeId Node ID. + * @param prefixId Path prefix. + * @return Join data. + * @throws Exception If failed. + */ private ZkJoiningNodeData unmarshalJoinData(UUID nodeId, UUID prefixId) throws Exception { String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId); @@ -1019,9 +1085,10 @@ public class ZookeeperDiscoveryImpl { } /** - * @param nodeId - * @param aliveNodePath - * @return + * @param nodeId Node ID. + * @param prefixId Path prefix. + * @param aliveNodePath Node path. + * @return Join data. * @throws Exception If failed. */ private Object unmarshalJoinDataOnCoordinator(UUID nodeId, UUID prefixId, String aliveNodePath) throws Exception { @@ -1072,6 +1139,7 @@ public class ZookeeperDiscoveryImpl { * @param internalId Joined node internal ID. * @param aliveNodePath Joined node path. * @throws Exception If failed. + * @return {@code True} if new join event was added. */ private boolean processJoinOnCoordinator( TreeMap<Long, ZookeeperClusterNode> curTop, @@ -1204,7 +1272,9 @@ public class ZookeeperDiscoveryImpl { /** * @param curTop Current nodes. + * @param joiningNodeData Join data. * @param internalId Joined node internal ID. + * @param prefixId Unique path prefix. * @throws Exception If failed. */ private void generateNodeJoin( @@ -1392,14 +1462,26 @@ public class ZookeeperDiscoveryImpl { rtState.zkClient.deleteAll(path, rtState.zkClient.getChildren(path), -1); } + /** + * @return Nodes. + */ ZkClusterNodes nodes() { return rtState.top; } + /** + * @return Client. + */ ZookeeperClient zkClient() { return rtState.zkClient; } + /** + * @param evtPath Event path. + * @param sndNodeId Sender node ID. + * @return Event data. + * @throws Exception If failed. + */ private byte[] readCustomEventData(String evtPath, UUID sndNodeId) throws Exception { int partCnt = ZkIgnitePaths.customEventPartsCount(evtPath); @@ -1504,13 +1586,13 @@ public class ZookeeperDiscoveryImpl { catch (IgniteCheckedException e) { U.error(log, "Failed to unmarshal custom discovery message: " + e, e); - deleteCustomEventData(rtState.zkClient, evtPath); + deleteCustomEventDataAsync(rtState.zkClient, evtPath); } } else { U.warn(log, "Ignore custom event from unknown node: " + sndNodeId); - deleteCustomEventData(rtState.zkClient, evtPath); + deleteCustomEventDataAsync(rtState.zkClient, evtPath); } rtState.evtsData.procCustEvt = evtE.getKey(); @@ -1520,7 +1602,11 @@ public class ZookeeperDiscoveryImpl { } } - private void deleteCustomEventData(ZookeeperClient zkClient, String evtPath) { + /** + * @param zkClient Client. + * @param evtPath Event path. + */ + private void deleteCustomEventDataAsync(ZookeeperClient zkClient, String evtPath) { if (log.isDebugEnabled()) log.debug("Delete custom event data: " + evtPath); @@ -1774,12 +1860,13 @@ public class ZookeeperDiscoveryImpl { rtState.joined = true; - deleteDataForJoined(evtData); + deleteDataForJoinedAsync(evtData); } /** - * @param evtData - * @param msg + * @param evtData Event daa. + * @param msg Message. + * @throws Exception If failed. */ private void processInternalMessage(ZkDiscoveryCustomEventData evtData, ZkInternalMessage msg) throws Exception { if (msg instanceof ZkInternalForceNodeFailMessage) { @@ -1918,6 +2005,11 @@ public class ZookeeperDiscoveryImpl { throw new ZookeeperClientFailedException("Received node failed event for local node."); } else { + PingFuture pingFut = pingFuts.get(failedNode.id()); + + if (pingFut != null) + pingFut.onDone(false); + final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); lsnr.onDiscovery(EVT_NODE_FAILED, @@ -1957,7 +2049,7 @@ public class ZookeeperDiscoveryImpl { switch (evtData.eventType()) { case EventType.EVT_NODE_JOINED: { - handleProcessedJoinEvent((ZkDiscoveryNodeJoinEventData)evtData); + handleProcessedJoinEventAsync((ZkDiscoveryNodeJoinEventData)evtData); break; } @@ -2058,18 +2150,22 @@ public class ZookeeperDiscoveryImpl { /** * @param evtData Event data. - * @throws Exception If failed. */ - private void handleProcessedJoinEvent(ZkDiscoveryNodeJoinEventData evtData) throws Exception { + private void handleProcessedJoinEventAsync(ZkDiscoveryNodeJoinEventData evtData) { if (log.isDebugEnabled()) log.debug("All nodes processed node join [evtData=" + evtData + ']'); deleteJoiningNodeData(evtData.nodeId, evtData.joinDataPrefixId, evtData.joinDataPartCnt); - deleteDataForJoined(evtData); + deleteDataForJoinedAsync(evtData); } - private void deleteJoiningNodeData(UUID nodeId, UUID joinDataPrefixId, int partCnt) throws Exception { + /** + * @param nodeId Node ID. + * @param joinDataPrefixId Path prefix. + * @param partCnt Parts count. + */ + private void deleteJoiningNodeData(UUID nodeId, UUID joinDataPrefixId, int partCnt) { String evtDataPath = zkPaths.joiningNodeDataPath(nodeId, joinDataPrefixId); if (log.isDebugEnabled()) @@ -2081,7 +2177,10 @@ public class ZookeeperDiscoveryImpl { deleteMultiplePartsAsync(rtState.zkClient, evtDataPath + ":", partCnt); } - private void deleteDataForJoined(ZkDiscoveryNodeJoinEventData evtData) { + /** + * @param evtData Event data. + */ + private void deleteDataForJoinedAsync(ZkDiscoveryNodeJoinEventData evtData) { String dataForJoinedPath = zkPaths.joinEventDataPathForJoined(evtData.eventId()); if (log.isDebugEnabled()) @@ -2091,18 +2190,16 @@ public class ZookeeperDiscoveryImpl { } /** + * @param ctx Context for log. * @param evtData Event data. - * @throws Exception If failed. * @return Ack message. */ - @Nullable private DiscoverySpiCustomMessage handleProcessedCustomEvent(String ctx, ZkDiscoveryCustomEventData evtData) - throws Exception - { + @Nullable private DiscoverySpiCustomMessage handleProcessedCustomEvent(String ctx, ZkDiscoveryCustomEventData evtData) { if (log.isDebugEnabled()) log.debug("All nodes processed custom event [ctx=" + ctx + ", evtData=" + evtData + ']'); if (!evtData.ackEvent()) { - deleteCustomEventData(rtState.zkClient, evtData.evtPath); + deleteCustomEventDataAsync(rtState.zkClient, evtData.evtPath); assert evtData.msg != null || locNode.order() > evtData.topologyVersion() : evtData; @@ -2526,6 +2623,105 @@ public class ZookeeperDiscoveryImpl { /** * */ + private class PingFuture extends GridFutureAdapter<Boolean> implements IgniteSpiTimeoutObject, Runnable { + /** */ + private final ZookeeperClusterNode node; + + /** */ + private final long endTime; + + /** */ + private final IgniteUuid id; + + /** + * @param node Node. + */ + PingFuture(ZookeeperClusterNode node) { + this.node = node; + + id = IgniteUuid.fromUuid(node.id()); + + endTime = System.currentTimeMillis() + node.sessionTimeout() + 1000; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return id; + } + + /** {@inheritDoc} */ + @Override public long endTime() { + return endTime; + } + + /** {@inheritDoc} */ + @Override public void run() { + if (checkNodeAndState()) { + try { + for (String path : zkClient().getChildren(zkPaths.aliveNodesDir)) { + if (node.internalId() == ZkIgnitePaths.aliveInternalId(path)) { + onDone(true); + + return; + } + } + + onDone(false); + } + catch (Exception e) { + if (checkNodeAndState()) + onDone(e); + } + } + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + if (checkNodeAndState()) + runInWorkerThread(this); + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + pingFuts.remove(node.id(), this); + + return true; + } + + return false; + } + + /** + * @return {@code False} if future was completed. + */ + boolean checkNodeAndState() { + ConnectionState connState = ZookeeperDiscoveryImpl.this.connState; + + if (connState == ConnectionState.DISCONNECTED) { + onDone(new IgniteClientDisconnectedException(null, "Client is disconnected.")); + + return false; + } + else if (connState == ConnectionState.STOPPED) { + onDone(new IgniteException("Node stopped.")); + + return false; + } + + if (node(node.id()) == null) { + onDone(false); + + return false; + } + + return true; + } + } + + /** + * + */ enum ConnectionState { /** */ STARTED, http://git-wip-us.apache.org/repos/asf/ignite/blob/21a72646/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index 99f5089..8498c7c 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -1568,6 +1568,47 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testPing() throws Exception { + sesTimeout = 5000; + + startGrids(3); + + final ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(1)); + + final UUID nodeId = ignite(2).cluster().localNode().id(); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + assertTrue(spi.pingNode(nodeId)); + } + }, 32, "ping"); + + fut.get(); + + fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + spi.pingNode(nodeId); + } + }, 32, "ping"); + + U.sleep(100); + + stopGrid(2); + + fut.get(); + + fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + assertFalse(spi.pingNode(nodeId)); + } + }, 32, "ping"); + + fut.get(); + } + + /** + * @throws Exception If failed. + */ public void testWithPersistence1() throws Exception { startWithPersistence(false); }
