Repository: ignite Updated Branches: refs/heads/ignite-zk 93dd7ab79 -> 8bd1e077a
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f5f2060a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f5f2060a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f5f2060a Branch: refs/heads/ignite-zk Commit: f5f2060aa6978367d4bf160fd96dc4efa57a7a8c Parents: 4749d33 Author: sboikov <sboi...@gridgain.com> Authored: Wed Nov 22 16:13:18 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Nov 22 16:20:53 2017 +0300 ---------------------------------------------------------------------- .../zk/internal/ZkDiscoveryCustomEventData.java | 4 ++ .../zk/internal/ZookeeperDiscoveryImpl.java | 70 +++++++++++++------- 2 files changed, 51 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f5f2060a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java index 70e25c7..2e50831 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java @@ -43,6 +43,7 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData { * @param topVer Topology version. * @param sndNodeId Sender node ID. * @param evtPath Event path. + * @param ack Acknowledge event flag. */ ZkDiscoveryCustomEventData(long evtId, long topVer, UUID sndNodeId, String evtPath, boolean ack) { super(evtId, DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, topVer); @@ -57,6 +58,9 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData { flags |= CUSTOM_MSG_ACK_FLAG; } + /** + * @return {@code True} for custom event ack message. + */ boolean ackEvent() { return flagSet(CUSTOM_MSG_ACK_FLAG); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f5f2060a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 1466872..1be4017 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -64,7 +64,7 @@ import static org.apache.zookeeper.CreateMode.PERSISTENT; */ public class ZookeeperDiscoveryImpl { /** */ - public static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD = "IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD"; + static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD = "IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD"; /** */ private final JdkMarshaller marsh = new JdkMarshaller(); @@ -124,12 +124,12 @@ public class ZookeeperDiscoveryImpl { private final int evtsAckThreshold; /** - * @param log - * @param basePath - * @param clusterName - * @param locNode - * @param lsnr - * @param exchange + * @param log Logger. + * @param basePath Zookeeper base path node all nodes. + * @param clusterName Cluster name ( + * @param locNode Local node instance. + * @param lsnr Discovery events listener. + * @param exchange Discovery data exchange. */ public ZookeeperDiscoveryImpl(IgniteLogger log, String basePath, @@ -163,10 +163,16 @@ public class ZookeeperDiscoveryImpl { this.evtsAckThreshold = evtsAckThreshold; } + /** + * @return Logger. + */ IgniteLogger log() { return log; } + /** + * @return Local node instance. + */ public ClusterNode localNode() { return locNode; } @@ -181,6 +187,10 @@ public class ZookeeperDiscoveryImpl { return top.nodesById.get(nodeId); } + /** + * @param nodeId Node ID. + * @return Ping result. + */ public boolean pingNode(UUID nodeId) { // TODO ZK return node(nodeId) != null; @@ -193,6 +203,10 @@ public class ZookeeperDiscoveryImpl { return top.remoteNodes(); } + /** + * @param nodeId Node ID. + * @return {@code True} if node joined or joining topology. + */ public boolean knownNode(UUID nodeId) { try { List<String> children = zkClient.getChildren(zkPaths.aliveNodesDir); @@ -253,9 +267,9 @@ public class ZookeeperDiscoveryImpl { } /** - * @param igniteInstanceName - * @param connectString - * @param sesTimeout + * @param igniteInstanceName Ignite instance name. + * @param connectString Zookeeper connect string. + * @param sesTimeout Zookeeper session timeout. * @throws InterruptedException If interrupted. */ public void joinTopology(String igniteInstanceName, String connectString, int sesTimeout) @@ -333,6 +347,7 @@ public class ZookeeperDiscoveryImpl { } /** + * @param joinDataBytes Joining node data. * @throws InterruptedException If interrupted. */ private void startJoin(byte[] joinDataBytes) throws InterruptedException { @@ -372,7 +387,7 @@ public class ZookeeperDiscoveryImpl { * * @throws Exception If failed. */ - public void waitConnectStart() throws Exception { + void waitConnectStart() throws Exception { connStartLatch.await(); } @@ -386,6 +401,10 @@ public class ZookeeperDiscoveryImpl { checkIsCoordinator(rc, aliveNodes); } + /** + * @param rc Callback result code. + * @param aliveNodes Alive nodes. + */ private void checkIsCoordinator(int rc, final List<String> aliveNodes) { try { assert rc == 0 : rc; @@ -479,6 +498,7 @@ public class ZookeeperDiscoveryImpl { } /** + * @param aliveNodes Alive nodes paths. * @param locInternalId Local node's internal ID. * @throws Exception If failed. */ @@ -497,13 +517,8 @@ public class ZookeeperDiscoveryImpl { assert locNode.order() > 0 : locNode; assert this.evtsData != null; - Iterator<ZkDiscoveryEventData> it = evtsData.evts.values().iterator(); - - while (it.hasNext()) { - ZkDiscoveryEventData evtData = it.next(); - + for (ZkDiscoveryEventData evtData : evtsData.evts.values()) evtData.remainingAcks(top.nodesByOrder.values()); - } handleProcessedEvents(); } @@ -682,6 +697,12 @@ public class ZookeeperDiscoveryImpl { } } + /** + * @param curTop Current nodes. + * @param internalId Joined node internal ID. + * @param aliveNodePath Joined node path. + * @throws Exception If failed. + */ private void generateNodeJoin(TreeMap<Long, ZookeeperClusterNode> curTop, int internalId, String aliveNodePath) @@ -938,11 +959,7 @@ public class ZookeeperDiscoveryImpl { boolean updateNodeInfo = false; - Iterator<ZkDiscoveryEventData> it = evts.tailMap(locNodeInfo.lastProcEvt, false).values().iterator(); - - while (it.hasNext()) { - ZkDiscoveryEventData evtData = it.next(); - + for (ZkDiscoveryEventData evtData : evts.tailMap(locNodeInfo.lastProcEvt, false).values()) { if (!joined) { if (evtData.eventType() != EventType.EVT_NODE_JOINED) continue; @@ -1127,6 +1144,12 @@ public class ZookeeperDiscoveryImpl { } } + /** + * @param evtsData Events data. + * @param evtData Local join event data. + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") private void processLocalJoin(ZkDiscoveryEventsData evtsData, ZkDiscoveryNodeJoinEventData evtData) throws Exception { @@ -1222,7 +1245,7 @@ public class ZookeeperDiscoveryImpl { * @param evtData Event data. */ @SuppressWarnings("unchecked") - private void notifyNodeFail(ZkDiscoveryNodeFailEventData evtData) throws Exception { + private void notifyNodeFail(ZkDiscoveryNodeFailEventData evtData) { ZookeeperClusterNode failedNode = top.removeNode(evtData.failedNodeInternalId()); assert failedNode != null; @@ -1239,6 +1262,7 @@ public class ZookeeperDiscoveryImpl { /** * @param failedNode Failed node. + * @throws Exception If failed. */ private void processEventAcksOnNodeFail(ZookeeperClusterNode failedNode) throws Exception { boolean processed = false;