Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 0918da57c -> b5fe175f9


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b5fe175f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b5fe175f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b5fe175f

Branch: refs/heads/ignite-zk
Commit: b5fe175f92d77e167a74a03baca756852dced34b
Parents: 0918da5
Author: sboikov <[email protected]>
Authored: Fri Dec 29 16:35:42 2017 +0300
Committer: sboikov <[email protected]>
Committed: Fri Dec 29 17:30:16 2017 +0300

----------------------------------------------------------------------
 .../zk/internal/ZkDiscoveryEventsData.java      | 11 +---
 .../discovery/zk/internal/ZkRuntimeState.java   |  8 ---
 .../discovery/zk/internal/ZookeeperClient.java  | 10 +++
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 68 +++++++++++++-------
 .../zk/internal/ZookeeperDiscoverySpiTest.java  |  4 ++
 5 files changed, 60 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b5fe175f/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
index 70e6ba2..dce861b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
@@ -45,9 +45,6 @@ class ZkDiscoveryEventsData implements Serializable {
     /** Max node internal order in cluster. */
     long maxInternalOrder;
 
-    /** Min internal order in cluster. */
-    final long startInternalOrder;
-
     /** Cluster start time (recorded when first node in cluster starts). */
     final long clusterStartTime;
 
@@ -58,15 +55,12 @@ class ZkDiscoveryEventsData implements Serializable {
     private UUID commErrFutId;
 
     /**
-     * @param startInternalOrder Starting internal order for cluster (znodes 
having lower order belong
-     *      to previous cluster and should be ignored).
      * @param clusterStartTime Start time of first node in cluster.
      * @return Events.
      */
-    static ZkDiscoveryEventsData createForNewCluster(long startInternalOrder, 
long clusterStartTime) {
+    static ZkDiscoveryEventsData createForNewCluster(long clusterStartTime) {
         return new ZkDiscoveryEventsData(
             UUID.randomUUID(),
-            startInternalOrder,
             clusterStartTime,
             1L,
             new TreeMap<Long, ZkDiscoveryEventData>()
@@ -75,20 +69,17 @@ class ZkDiscoveryEventsData implements Serializable {
 
     /**
      * @param clusterId Cluster ID.
-     * @param startInternalOrder Starting internal order for cluster.
      * @param topVer Current topology version.
      * @param clusterStartTime Cluster start time.
      * @param evts Events history.
      */
     private ZkDiscoveryEventsData(
         UUID clusterId,
-        long startInternalOrder,
         long clusterStartTime,
         long topVer,
         TreeMap<Long, ZkDiscoveryEventData> evts)
     {
         this.clusterId = clusterId;
-        this.startInternalOrder = startInternalOrder;
         this.clusterStartTime = clusterStartTime;
         this.topVer = topVer;
         this.evts = evts;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5fe175f/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
index 774c2a9..6792154 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
@@ -103,14 +103,6 @@ class ZkRuntimeState {
     }
 
     /**
-     * @param internalOrder Node internal order.
-     * @return {@code True} if node belongs to previous cluster and should be 
ignored.
-     */
-    boolean ignoreAliveNode(long internalOrder) {
-        return evtsData != null && internalOrder < evtsData.startInternalOrder;
-    }
-
-    /**
      * @param err Error.
      */
     void onCloseStart(Exception err) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5fe175f/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index f5ecf52..9600d58 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -462,6 +462,16 @@ public class ZookeeperClient implements Watcher {
 
     /**
      * @param path Path.
+     * @throws InterruptedException If interrupted.
+     * @throws KeeperException In case of error.
+     * @return {@code True} if given path exists.
+     */
+    boolean existsNoRetry(String path) throws InterruptedException, 
KeeperException {
+        return zk.exists(path, false) != null;
+    }
+
+    /**
+     * @param path Path.
      * @param ver Expected version.
      * @throws InterruptedException If interrupted.
      * @throws KeeperException In case of error.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5fe175f/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 10d8061..c4756fe 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -93,6 +93,7 @@ import static 
org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
 import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_SUBJECT_V2;
@@ -524,7 +525,7 @@ public class ZookeeperDiscoveryImpl {
         if (nodes.isEmpty())
             nodes = Collections.singletonList((ClusterNode)locNode);
 
-        lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED,
+        lsnr.onDiscovery(EVT_NODE_SEGMENTED,
             rtState.evtsData != null ? rtState.evtsData.topVer : 1L,
             locNode,
             nodes,
@@ -1264,9 +1265,6 @@ public class ZookeeperDiscoveryImpl {
 
             Long internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath);
 
-            if (rtState.ignoreAliveNode(internalId))
-                continue;
-
             aliveSrvs.put(internalId, aliveNodePath);
         }
 
@@ -1314,9 +1312,6 @@ public class ZookeeperDiscoveryImpl {
         for (String aliveNodePath : aliveNodes) {
             Long internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath);
 
-            if (rtState.ignoreAliveNode(internalId))
-                continue;
-
             if (ZkIgnitePaths.aliveNodeClientFlag(aliveNodePath))
                 aliveClients.put(internalId, aliveNodePath);
             else {
@@ -1550,16 +1545,6 @@ public class ZookeeperDiscoveryImpl {
         for (String child : aliveNodes) {
             Long internalId = ZkIgnitePaths.aliveInternalId(child);
 
-            if (rtState.ignoreAliveNode(internalId)) {
-                if (log.isInfoEnabled()) {
-                    LT.info(log, "Ignore node from previous cluster 
[startOrder=" + rtState.evtsData.startInternalOrder +
-                        ", nodeOrder=" + internalId +
-                        ", znode=" + child + ']');
-                }
-
-                continue;
-            }
-
             Object old = alives.put(internalId, child);
 
             assert old == null;
@@ -1594,6 +1579,8 @@ public class ZookeeperDiscoveryImpl {
                             ", totalEvts=" + rtState.evtsData.evts.size() + 
']');
                     }
 
+                    handleProcessedEventsOnNodesFail(failedNodes);
+
                     throttleNewEventsGeneration();
 
                     rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, 
rtState.watcher, rtState.watcher);
@@ -1603,6 +1590,17 @@ public class ZookeeperDiscoveryImpl {
             }
         }
 
+        // Process failures before processing join, otherwise conflicts are 
possible in case of fast node stop/re-start.
+        if (newEvts > 0) {
+            saveAndProcessNewEvents();
+
+            handleProcessedEventsOnNodesFail(failedNodes);
+
+            rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, 
rtState.watcher, rtState.watcher);
+
+            return;
+        }
+
         for (Map.Entry<Long, String> e : alives.entrySet()) {
             Long internalId = e.getKey();
 
@@ -2095,14 +2093,12 @@ public class ZookeeperDiscoveryImpl {
 
         spi.getSpiContext().removeTimeoutObject(rtState.joinErrTo);
 
-        cleanupPreviousClusterData();
+        cleanupPreviousClusterData(prevEvts != null ? 
prevEvts.maxInternalOrder + 1 : -1L);
 
         rtState.joined = true;
         rtState.gridStartTime = System.currentTimeMillis();
 
-        rtState.evtsData = ZkDiscoveryEventsData.createForNewCluster(
-            prevEvts != null ? prevEvts.maxInternalOrder + 1 : -1L,
-            rtState.gridStartTime);
+        rtState.evtsData = 
ZkDiscoveryEventsData.createForNewCluster(rtState.gridStartTime);
 
         if (log.isInfoEnabled()) {
             log.info("New cluster started [locId=" + locNode.id() +
@@ -2119,22 +2115,26 @@ public class ZookeeperDiscoveryImpl {
 
         final List<ClusterNode> topSnapshot = 
Collections.singletonList((ClusterNode)locNode);
 
-        lsnr.onDiscovery(EventType.EVT_NODE_JOINED,
+        lsnr.onDiscovery(EVT_NODE_JOINED,
             1L,
             locNode,
             topSnapshot,
             Collections.<Long, Collection<ClusterNode>>emptyMap(),
             null);
 
+        // Reset events (this is also notification for clients left from 
previous cluster).
         rtState.zkClient.setData(zkPaths.evtsPath, 
marshalZip(rtState.evtsData), -1);
 
         joinFut.onDone();
     }
 
     /**
+     * @param startInternalOrder Starting internal order for cluster (znodes 
having lower order belong
+     *      to clients from previous cluster and should be removed).
+
      * @throws Exception If failed.
      */
-    private void cleanupPreviousClusterData() throws Exception {
+    private void cleanupPreviousClusterData(long startInternalOrder) throws 
Exception {
         long start = System.currentTimeMillis();
 
         ZookeeperClient client = rtState.zkClient;
@@ -2162,6 +2162,13 @@ public class ZookeeperDiscoveryImpl {
             rtState.zkClient.getChildren(zkPaths.customEvtsAcksDir),
             -1);
 
+        if (startInternalOrder > 0) {
+            for (String alive : 
rtState.zkClient.getChildren(zkPaths.aliveNodesDir)) {
+                if (ZkIgnitePaths.aliveInternalId(alive) < startInternalOrder)
+                    rtState.zkClient.deleteIfExists(zkPaths.aliveNodesDir + 
"/" + alive, -1);
+            }
+        }
+
         long time = System.currentTimeMillis() - start;
 
         if (time > 0) {
@@ -3292,6 +3299,21 @@ public class ZookeeperDiscoveryImpl {
     private Exception localNodeFail(String msg, boolean clientReconnect) {
         U.warn(log, msg);
 
+//        if (locNode.isClient() && rtState.zkClient.connected()) {
+//            String path = 
rtState.locNodeZkPath.substring(rtState.locNodeZkPath.lastIndexOf('/') + 1);
+//
+//            String joinDataPath = zkPaths.joiningNodeDataPath(locNode.id(), 
ZkIgnitePaths.aliveNodePrefixId(path));
+//
+//            try {
+//                if (rtState.zkClient.existsNoRetry(joinDataPath))
+//                    rtState.zkClient.deleteIfExistsNoRetry(joinDataPath, -1);
+//            }
+//            catch (Exception e) {
+//                if (log.isDebugEnabled())
+//                    log.debug("Failed to clean local node's join data on 
stop: " + e);
+//            }
+//        }
+
         if (clientReconnect && clientReconnectEnabled) {
             assert locNode.isClient() : locNode;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5fe175f/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
index 0e7141a..af73535 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
@@ -1984,6 +1984,10 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
                             if (!znode.contains("/")) // Ignore roots.
                                 continue;
 
+                            // TODO ZK
+                            if (znode.startsWith("jd/"))
+                                continue;
+
                             log.info("Found unexpected znode: " + znode);
 
                             return false;

Reply via email to