Repository: ignite
Updated Branches:
  refs/heads/ignite-zk d9456dc12 -> ab47f191b


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ab47f191
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ab47f191
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ab47f191

Branch: refs/heads/ignite-zk
Commit: ab47f191b004974eabc0e817d94aaa8e2b8ccb42
Parents: d9456dc
Author: sboikov <[email protected]>
Authored: Fri Dec 1 13:00:53 2017 +0300
Committer: sboikov <[email protected]>
Committed: Fri Dec 1 13:00:53 2017 +0300

----------------------------------------------------------------------
 .../zk/internal/ZkDiscoveryEventData.java       |  6 +--
 .../zk/internal/ZkDiscoveryEventsData.java      |  7 +--
 .../discovery/zk/internal/ZookeeperClient.java  | 18 +++++++
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 51 ++++++++++++--------
 4 files changed, 52 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ab47f191/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
index f50c504..d8f9a3d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
@@ -64,18 +64,14 @@ abstract class ZkDiscoveryEventData implements Serializable 
{
     }
 
     /**
-     * @param alives Optional alives nodes for additional filtering.
      * @param nodes Current nodes in topology.
      */
-    void initRemainingAcks(Collection<ZookeeperClusterNode> nodes, @Nullable 
TreeMap<Integer, String> alives) {
+    void initRemainingAcks(Collection<ZookeeperClusterNode> nodes) {
         assert remainingAcks == null : this;
 
         remainingAcks = U.newHashSet(nodes.size());
 
         for (ZookeeperClusterNode node : nodes) {
-            if (alives != null && !alives.containsKey(node.internalId()))
-                continue;
-
             if (!node.isLocal() && node.order() <= topVer) {
                 boolean add = remainingAcks.add(node.internalId());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ab47f191/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
index a116a0d..b29d85e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
@@ -58,16 +58,13 @@ class ZkDiscoveryEventsData implements Serializable {
     /**
      * @param nodes Current nodes in topology (these nodes should ack that 
event processed).
      * @param evt Event.
-     * @param alives Optional alives nodes for additional filtering.
      */
-    void addEvent(Collection<ZookeeperClusterNode> nodes,
-        ZkDiscoveryEventData evt,
-        @Nullable TreeMap<Integer, String> alives)
+    void addEvent(Collection<ZookeeperClusterNode> nodes, ZkDiscoveryEventData 
evt)
     {
         Object old = evts.put(evt.eventId(), evt);
 
         assert old == null : old;
 
-        evt.initRemainingAcks(nodes, alives);
+        evt.initRemainingAcks(nodes);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ab47f191/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index 3a4a45d..2ccc7ea 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -870,6 +870,9 @@ public class ZookeeperClient implements Watcher {
 
         /** {@inheritDoc} */
         @Override public void processResult(int rc, String path, Object ctx) {
+            if (closing)
+                return;
+
             if (rc == KeeperException.Code.NONODE.intValue())
                 return;
 
@@ -901,6 +904,9 @@ public class ZookeeperClient implements Watcher {
         }
 
         @Override public void processResult(int rc, String path, Object ctx, 
String name) {
+            if (closing)
+                return;
+
             if (rc == KeeperException.Code.NODEEXISTS.intValue())
                 return;
 
@@ -934,6 +940,9 @@ public class ZookeeperClient implements Watcher {
 
         /** {@inheritDoc} */
         @Override public void processResult(int rc, String path, Object ctx, 
List<String> children, Stat stat) {
+            if (closing)
+                return;
+
             if (needRetry(rc)) {
                 U.warn(log, "Failed to execute async operation, connection 
lost. Will retry after connection restore [path=" + path + ']');
 
@@ -962,6 +971,9 @@ public class ZookeeperClient implements Watcher {
 
         /** {@inheritDoc} */
         @Override public void processResult(int rc, String path, Object ctx, 
byte[] data, Stat stat) {
+            if (closing)
+                return;
+
             if (needRetry(rc)) {
                 U.warn(log, "Failed to execute async operation, connection 
lost. Will retry after connection restore [path=" + path + ']');
 
@@ -990,6 +1002,9 @@ public class ZookeeperClient implements Watcher {
 
         /** {@inheritDoc} */
         @Override public void processResult(int rc, String path, Object ctx, 
Stat stat) {
+            if (closing)
+                return;
+
             if (needRetry(rc)) {
                 U.warn(log, "Failed to execute async operation, connection 
lost. Will retry after connection restore [path=" + path + ']');
 
@@ -1021,6 +1036,9 @@ public class ZookeeperClient implements Watcher {
             boolean connLoss = false;
 
             synchronized (stateMux) {
+                if (closing)
+                    return;
+
                 if (state == ConnectionState.Disconnected &&
                     ZookeeperClient.this.connStartTime == connectStartTime) {
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ab47f191/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 35fff0a..88905b8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -648,9 +648,9 @@ public class ZookeeperDiscoveryImpl {
             assert state.evtsData != null;
 
             for (ZkDiscoveryEventData evtData : state.evtsData.evts.values())
-                evtData.initRemainingAcks(state.top.nodesByOrder.values(), 
null);
+                evtData.initRemainingAcks(state.top.nodesByOrder.values());
 
-            handleProcessedEvents("crd", null);
+            handleProcessedEvents("crd");
         }
         else {
             if (log.isInfoEnabled())
@@ -710,11 +710,16 @@ public class ZookeeperDiscoveryImpl {
             }
         }
 
+        List<ZookeeperClusterNode> failedNodes = null;
+
         for (Map.Entry<Integer, ZookeeperClusterNode> e : 
state.top.nodesByInternalId.entrySet()) {
             if (!alives.containsKey(e.getKey())) {
                 ZookeeperClusterNode failedNode = e.getValue();
 
-                handleProcessedEventsOnNodeFail(failedNode, alives);
+                if (failedNodes == null)
+                    failedNodes = new ArrayList<>();
+
+                failedNodes.add(failedNode);
 
                 generateNodeFail(curTop, failedNode);
 
@@ -724,6 +729,9 @@ public class ZookeeperDiscoveryImpl {
 
         if (newEvts)
             saveAndProcessNewEvents();
+
+        if (failedNodes != null)
+            handleProcessedEventsOnNodesFail(failedNodes);
     }
 
     /**
@@ -766,7 +774,7 @@ public class ZookeeperDiscoveryImpl {
             state.evtsData.topVer,
             failedNode.internalId());
 
-        state.evtsData.addEvent(curTop.values(), evtData, null);
+        state.evtsData.addEvent(curTop.values(), evtData);
 
         if (log.isInfoEnabled())
             log.info("Generated NODE_FAILED event [evt=" + evtData + ']');
@@ -838,7 +846,7 @@ public class ZookeeperDiscoveryImpl {
 
         evtData.joiningNodeData = joiningNodeData;
 
-        state.evtsData.addEvent(dataForJoined.topology(), evtData, null);
+        state.evtsData.addEvent(dataForJoined.topology(), evtData);
 
         evtData.addRemainingAck(joinedNode); // Topology for joined node does 
not contain joined node.
 
@@ -1013,7 +1021,7 @@ public class ZookeeperDiscoveryImpl {
 
                         evtData.msg = msg;
 
-                        
state.evtsData.addEvent(state.top.nodesByOrder.values(), evtData, null);
+                        
state.evtsData.addEvent(state.top.nodesByOrder.values(), evtData);
 
                         if (log.isDebugEnabled())
                             log.debug("Generated CUSTOM event [evt=" + evtData 
+ ", msg=" + msg + ']');
@@ -1180,7 +1188,7 @@ public class ZookeeperDiscoveryImpl {
         }
 
         if (state.crd)
-            handleProcessedEvents("procEvt", null);
+            handleProcessedEvents("procEvt");
         else if (updateNodeInfo) {
             assert state.locNodeZkPath != null;
 
@@ -1332,10 +1340,9 @@ public class ZookeeperDiscoveryImpl {
 
     /**
      * @param ctx Context for logging.
-     * @param alives Optional alives nodes for additional filtering.
      * @throws Exception If failed.
      */
-    private void handleProcessedEvents(String ctx, @Nullable TreeMap<Integer, 
String> alives) throws Exception {
+    private void handleProcessedEvents(String ctx) throws Exception {
         Iterator<ZkDiscoveryEventData> it = 
state.evtsData.evts.values().iterator();
 
         List<ZkDiscoveryCustomEventData> newEvts = null;
@@ -1428,17 +1435,17 @@ public class ZookeeperDiscoveryImpl {
             Collection<ZookeeperClusterNode> nodes = 
state.top.nodesByOrder.values();
 
             for (int i = 0; i < newEvts.size(); i++)
-                state.evtsData.addEvent(nodes, newEvts.get(i), alives);
+                state.evtsData.addEvent(nodes, newEvts.get(i));
 
             saveAndProcessNewEvents();
         }
     }
 
     /**
-     * @param failedNode Failed node.
+     * @param failedNodes Failed nodes.
      * @throws Exception If failed.
      */
-    private void handleProcessedEventsOnNodeFail(ZookeeperClusterNode 
failedNode, TreeMap<Integer, String> alives) throws Exception {
+    private void handleProcessedEventsOnNodesFail(List<ZookeeperClusterNode> 
failedNodes) throws Exception {
         boolean processed = false;
 
         for (Iterator<Map.Entry<Long, ZkDiscoveryEventData>> it = 
state.evtsData.evts.entrySet().iterator(); it.hasNext();) {
@@ -1446,12 +1453,16 @@ public class ZookeeperDiscoveryImpl {
 
             ZkDiscoveryEventData evtData = e.getValue();
 
-            if (evtData.onNodeFail(failedNode))
-                processed = true;
+            for (int i = 0; i < failedNodes.size(); i++) {
+                ZookeeperClusterNode failedNode = failedNodes.get(i);
+
+                if (evtData.onNodeFail(failedNode))
+                    processed = true;
+            }
         }
 
         if (processed)
-            handleProcessedEvents("fail-" + failedNode.id(), alives);
+            handleProcessedEvents("fail-" + U.nodeIds(failedNodes));
     }
 
     /**
@@ -1459,14 +1470,14 @@ public class ZookeeperDiscoveryImpl {
      * @throws Exception If failed.
      */
     private void handleProcessedJoinEvent(ZkDiscoveryNodeJoinEventData 
evtData) throws Exception {
-        if (log.isInfoEnabled())
-            log.info("All nodes processed node join [evtData=" + evtData + 
']');
+        if (log.isDebugEnabled())
+            log.debug("All nodes processed node join [evtData=" + evtData + 
']');
 
         String evtDataPath = zkPaths.joinEventDataPath(evtData.eventId());
         String dataForJoinedPath = 
zkPaths.joinEventDataPathForJoined(evtData.eventId());
 
-        if (log.isInfoEnabled())
-            log.info("Delete processed event data [path1=" + evtDataPath + ", 
path2=" + dataForJoinedPath + ']');
+        if (log.isDebugEnabled())
+            log.debug("Delete processed event data [path1=" + evtDataPath + ", 
path2=" + dataForJoinedPath + ']');
 
         state.zkClient.deleteIfExistsAsync(evtDataPath);
         state.zkClient.deleteIfExistsAsync(dataForJoinedPath);
@@ -1821,7 +1832,7 @@ public class ZookeeperDiscoveryImpl {
                 }
 
                 if (processed)
-                    handleProcessedEvents("ack-" + nodeInternalId, null);
+                    handleProcessedEvents("ack-" + nodeInternalId);
             }
         }
     }

Reply via email to