Repository: ignite
Updated Branches:
  refs/heads/ignite-zk a5edc0e83 -> 03799c9c1


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/03799c9c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/03799c9c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/03799c9c

Branch: refs/heads/ignite-zk
Commit: 03799c9c1ac9758e3c15f6cd360b30fee4fad054
Parents: a5edc0e
Author: sboikov <[email protected]>
Authored: Tue Nov 28 11:52:05 2017 +0300
Committer: sboikov <[email protected]>
Committed: Tue Nov 28 13:50:44 2017 +0300

----------------------------------------------------------------------
 .../zk/internal/ZkDiscoveryCustomEventData.java |  11 +-
 .../zk/internal/ZkDiscoveryEventData.java       |  15 ++-
 .../zk/internal/ZkDiscoveryEventsData.java      |   9 +-
 .../internal/ZkDiscoveryNodeFailEventData.java  |   5 +-
 .../internal/ZkDiscoveryNodeJoinEventData.java  |   5 +-
 .../discovery/zk/internal/ZkIgnitePaths.java    |   4 +
 .../discovery/zk/internal/ZookeeperClient.java  |   7 ++
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 115 ++++++++++++++-----
 .../ZookeeperDiscoverySpiBasicTest.java         |  17 ++-
 9 files changed, 147 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/03799c9c/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
index 8da2e56..0d2288c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
@@ -30,7 +30,7 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData 
{
     private static final long serialVersionUID = 0L;
 
     /** */
-    private static final int CUSTOM_MSG_ACK_FLAG = 1;
+    private static final int CUSTOM_MSG_ACK_FLAG = 0x01;
 
     /** */
     final UUID sndNodeId;
@@ -52,7 +52,7 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData 
{
         super(evtId, DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, topVer);
 
         assert sndNodeId != null;
-        assert !F.isEmpty(evtPath);
+        assert ack || !F.isEmpty(evtPath);
 
         this.sndNodeId = sndNodeId;
         this.evtPath = evtPath;
@@ -70,6 +70,11 @@ class ZkDiscoveryCustomEventData extends 
ZkDiscoveryEventData {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return "ZkDiscoveryCustomEventData [topVer=" + topologyVersion() + ", 
evtId=" + eventId() + ", sndNode=" + sndNodeId + ']';
+        return "ZkDiscoveryCustomEventData [" +
+            "evtId=" + eventId() +
+            ", topVer=" + topologyVersion() +
+            ", sndNode=" + sndNodeId +
+            ", ack=" + ackEvent() +
+            ']';
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/03799c9c/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
index e7e8d31..f50c504 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
@@ -20,7 +20,9 @@ package org.apache.ignite.spi.discovery.zk.internal;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Set;
+import java.util.TreeMap;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
@@ -62,14 +64,18 @@ abstract class ZkDiscoveryEventData implements Serializable 
{
     }
 
     /**
+     * @param alives Optional alives nodes for additional filtering.
      * @param nodes Current nodes in topology.
      */
-    void initRemainingAcks(Collection<ZookeeperClusterNode> nodes) {
+    void initRemainingAcks(Collection<ZookeeperClusterNode> nodes, @Nullable 
TreeMap<Integer, String> alives) {
         assert remainingAcks == null : this;
 
         remainingAcks = U.newHashSet(nodes.size());
 
         for (ZookeeperClusterNode node : nodes) {
+            if (alives != null && !alives.containsKey(node.internalId()))
+                continue;
+
             if (!node.isLocal() && node.order() <= topVer) {
                 boolean add = remainingAcks.add(node.internalId());
 
@@ -97,6 +103,13 @@ abstract class ZkDiscoveryEventData implements Serializable 
{
     }
 
     /**
+     * @return Remaining acks.
+     */
+    Set<Integer> remainingAcks() {
+        return remainingAcks;
+    }
+
+    /**
      * @param nodeInternalId Node ID.
      * @param ackEvtId Last event ID processed on node.
      * @return {@code True} if all nodes processed event.

http://git-wip-us.apache.org/repos/asf/ignite/blob/03799c9c/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
index 6625ec0..a116a0d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.zk.internal;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.TreeMap;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -57,12 +58,16 @@ class ZkDiscoveryEventsData implements Serializable {
     /**
      * @param nodes Current nodes in topology (these nodes should ack that 
event processed).
      * @param evt Event.
+     * @param alives Optional alives nodes for additional filtering.
      */
-    void addEvent(Collection<ZookeeperClusterNode> nodes, ZkDiscoveryEventData 
evt) {
+    void addEvent(Collection<ZookeeperClusterNode> nodes,
+        ZkDiscoveryEventData evt,
+        @Nullable TreeMap<Integer, String> alives)
+    {
         Object old = evts.put(evt.eventId(), evt);
 
         assert old == null : old;
 
-        evt.initRemainingAcks(nodes);
+        evt.initRemainingAcks(nodes, alives);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/03799c9c/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
index b25f39c..a5dbd02 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
@@ -49,8 +49,9 @@ class ZkDiscoveryNodeFailEventData extends 
ZkDiscoveryEventData {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return "ZkDiscoveryNodeFailEventData [topVer=" + topologyVersion() +
-            ", evtId=" + eventId() +
+        return "ZkDiscoveryNodeFailEventData [" +
+            "evtId=" + eventId() +
+            ", topVer=" + topologyVersion() +
             ", nodeId=" + failedNodeInternalId + ']';
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/03799c9c/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
index e96f386..df4c137 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
@@ -51,8 +51,9 @@ class ZkDiscoveryNodeJoinEventData extends 
ZkDiscoveryEventData {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return "ZkDiscoveryNodeJoinEventData [topVer=" + topologyVersion() +
-            ", evtId=" + eventId() +
+        return "ZkDiscoveryNodeJoinEventData [" +
+            "evtId=" + eventId() +
+            ", topVer=" + topologyVersion() +
             ", node=" + nodeId + ']';
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/03799c9c/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index cd2ff0b..30138e5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -266,6 +266,10 @@ class ZkIgnitePaths {
         return evtsPath + "/joined-" + evtId;
     }
 
+    String ackEventDataPath(long evtId) {
+        return customEventDataPath(true, String.valueOf(evtId));
+    }
+
     /**
      * @param ack Ack event flag.
      * @param child Event child path.

http://git-wip-us.apache.org/repos/asf/ignite/blob/03799c9c/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index 0f39da6..ea9b289 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -130,6 +130,13 @@ public class ZookeeperClient implements Watcher {
         scheduleConnectionCheck();
     }
 
+    /**
+     * @return Zookeeper client.
+     */
+    ZooKeeper zk() {
+        return zk;
+    }
+
     /** {@inheritDoc} */
     @Override public void process(WatchedEvent evt) {
         if (closing)

http://git-wip-us.apache.org/repos/asf/ignite/blob/03799c9c/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index ef3da38..bf66ed0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -574,9 +574,9 @@ public class ZookeeperDiscoveryImpl {
             assert state.evtsData != null;
 
             for (ZkDiscoveryEventData evtData : state.evtsData.evts.values())
-                evtData.initRemainingAcks(state.top.nodesByOrder.values());
+                evtData.initRemainingAcks(state.top.nodesByOrder.values(), 
null);
 
-            handleProcessedEvents();
+            handleProcessedEvents("crd", null);
         }
         else {
             if (log.isInfoEnabled())
@@ -612,7 +612,7 @@ public class ZookeeperDiscoveryImpl {
         assert state.crd;
 
         if (log.isInfoEnabled())
-            log.info("Process alive nodes change: " + aliveNodes);
+            log.info("Process alive nodes change: " + aliveNodes.size());
 
         TreeMap<Integer, String> alives = new TreeMap<>();
 
@@ -640,7 +640,7 @@ public class ZookeeperDiscoveryImpl {
             if (!alives.containsKey(e.getKey())) {
                 ZookeeperClusterNode failedNode = e.getValue();
 
-                handleProcessedEventsOnNodeFail(failedNode);
+                handleProcessedEventsOnNodeFail(failedNode, alives);
 
                 generateNodeFail(curTop, failedNode);
 
@@ -667,6 +667,8 @@ public class ZookeeperDiscoveryImpl {
         if (log.isInfoEnabled()) {
             log.info("Discovery coordinator saved new topology events 
[topVer=" + state.evtsData.topVer +
                 ", size=" + evtsBytes.length +
+                ", evts=" + state.evtsData.evts.size() +
+                ", lastEvt=" + state.evtsData.evtIdGen +
                 ", saveTime=" + time + ']');
         }
 
@@ -690,7 +692,7 @@ public class ZookeeperDiscoveryImpl {
             state.evtsData.topVer,
             failedNode.internalId());
 
-        state.evtsData.addEvent(curTop.values(), evtData);
+        state.evtsData.addEvent(curTop.values(), evtData, null);
 
         if (log.isInfoEnabled())
             log.info("Generated NODE_FAILED event [evt=" + evtData + ']');
@@ -762,7 +764,7 @@ public class ZookeeperDiscoveryImpl {
 
         evtData.joiningNodeData = joiningNodeData;
 
-        state.evtsData.addEvent(dataForJoined.topology(), evtData);
+        state.evtsData.addEvent(dataForJoined.topology(), evtData, null);
 
         evtData.addRemainingAck(joinedNode); // Topology for joined node does 
not contain joined node.
 
@@ -806,6 +808,9 @@ public class ZookeeperDiscoveryImpl {
 
         String joinDataPath = zkPaths.joiningNodeDataPath(locNode.id(), path);
 
+        if (log.isInfoEnabled())
+            log.info("Delete join data: " + joinDataPath);
+
         // TODO ZK async
         state.zkClient.deleteIfExists(joinDataPath, -1);
 
@@ -930,7 +935,7 @@ public class ZookeeperDiscoveryImpl {
 
                         evtData.msg = msg;
 
-                        
state.evtsData.addEvent(state.top.nodesByOrder.values(), evtData);
+                        
state.evtsData.addEvent(state.top.nodesByOrder.values(), evtData, null);
 
                         if (log.isInfoEnabled())
                             log.info("Generated CUSTOM event [evt=" + evtData 
+ ", msg=" + msg + ']');
@@ -1063,8 +1068,12 @@ public class ZookeeperDiscoveryImpl {
                             msg = evtData0.msg;
                         }
                         else {
-                            String path = 
zkPaths.customEventDataPath(evtData0.ackEvent(),
-                                evtData0.evtPath);
+                            String path;
+
+                            if (evtData0.ackEvent())
+                                path = 
zkPaths.ackEventDataPath(evtData0.eventId());
+                            else
+                                path = zkPaths.customEventDataPath(false, 
evtData0.evtPath);
 
                             msg = unmarshal(state.zkClient.getData(path));
 
@@ -1094,12 +1103,14 @@ public class ZookeeperDiscoveryImpl {
             }
         }
 
-        if (state.crd) {
-            handleProcessedEvents();
-        }
+        if (state.crd)
+            handleProcessedEvents("procEvt", null);
         else if (updateNodeInfo) {
             assert state.locNodeZkPath != null;
 
+            if (log.isInfoEnabled())
+                log.info("Update processed events: " + 
state.locNodeInfo.lastProcEvt);
+
             state.zkClient.setData(state.locNodeZkPath, 
marshal(state.locNodeInfo), -1);
         }
     }
@@ -1174,6 +1185,9 @@ public class ZookeeperDiscoveryImpl {
 
         state.joined = true;
 
+        if (log.isInfoEnabled())
+            log.info("Delete data for joined: " + path);
+
         // TODO ZK: async
         state.zkClient.deleteIfExists(path, -1);
     }
@@ -1258,17 +1272,32 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
+     * @param ctx Context for logging.
+     * @param alives Optional alives nodes for additional filtering.
      * @throws Exception If failed.
      */
-    private void handleProcessedEvents() throws Exception {
+    private void handleProcessedEvents(String ctx, @Nullable TreeMap<Integer, 
String> alives) throws Exception {
         Iterator<ZkDiscoveryEventData> it = 
state.evtsData.evts.values().iterator();
 
         List<ZkDiscoveryCustomEventData> newEvts = null;
 
+        ZkDiscoveryEventData prevEvtData = null;
+
         while (it.hasNext()) {
             ZkDiscoveryEventData evtData = it.next();
 
             if (evtData.allAcksReceived()) {
+                if (prevEvtData != null) {
+                    if (log.isInfoEnabled()) {
+                        log.info("Previous event is not acked [" +
+                            "evtId=" + evtData.eventId() +
+                            ", prevEvtData=" + prevEvtData +
+                            ", remaining=" + prevEvtData.remainingAcks() + 
']');
+                    }
+                }
+
+                prevEvtData = null;
+
                 switch (evtData.eventType()) {
                     case EventType.EVT_NODE_JOINED: {
                         
handleProcessedJoinEvent((ZkDiscoveryNodeJoinEventData)evtData);
@@ -1277,7 +1306,7 @@ public class ZookeeperDiscoveryImpl {
                     }
 
                     case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: {
-                        DiscoverySpiCustomMessage ack = 
handleProcessedCustomEvent((ZkDiscoveryCustomEventData)evtData);
+                        DiscoverySpiCustomMessage ack = 
handleProcessedCustomEvent(ctx, (ZkDiscoveryCustomEventData)evtData);
 
                         if (ack != null) {
                             state.evtsData.evtIdGen++;
@@ -1286,10 +1315,14 @@ public class ZookeeperDiscoveryImpl {
 
                             byte[] ackBytes = marshal(ack);
 
-                            String evtChildPath = String.valueOf(evtId);
+                            String path = zkPaths.ackEventDataPath(evtId);
+
+                            if (log.isInfoEnabled())
+                                log.info("Create ack event: " + path);
 
+                            // TODO ZK: delete is previous exists?
                             state.zkClient.createIfNeeded(
-                                zkPaths.customEventDataPath(true, 
evtChildPath),
+                                path,
                                 ackBytes,
                                 CreateMode.PERSISTENT);
 
@@ -1297,7 +1330,7 @@ public class ZookeeperDiscoveryImpl {
                                 evtId,
                                 evtData.topologyVersion(), // Use topology 
version from original event.
                                 locNode.id(),
-                                evtChildPath,
+                                null,
                                 true);
 
                             ackEvtData.msg = ack;
@@ -1307,8 +1340,11 @@ public class ZookeeperDiscoveryImpl {
 
                             newEvts.add(ackEvtData);
 
-                            if (log.isInfoEnabled())
-                                log.info("Generated CUSTOM event ack [evt=" + 
evtData + ", msg=" + ack + ']');
+                            if (log.isInfoEnabled()) {
+                                log.info("Generated CUSTOM event ack 
[baseEvtId=" + evtData.eventId() +
+                                    ", evt=" + ackEvtData +
+                                    ", msg=" + ack + ']');
+                            }
                         }
 
                         break;
@@ -1324,11 +1360,15 @@ public class ZookeeperDiscoveryImpl {
 
                 it.remove();
             }
+            else
+                prevEvtData = evtData;
         }
 
         if (newEvts != null) {
+            Collection<ZookeeperClusterNode> nodes = 
state.top.nodesByOrder.values();
+
             for (int i = 0; i < newEvts.size(); i++)
-                state.evtsData.addEvent(state.top.nodesByOrder.values(), 
newEvts.get(i));
+                state.evtsData.addEvent(nodes, newEvts.get(i), alives);
 
             saveAndProcessNewEvents();
         }
@@ -1338,7 +1378,7 @@ public class ZookeeperDiscoveryImpl {
      * @param failedNode Failed node.
      * @throws Exception If failed.
      */
-    private void handleProcessedEventsOnNodeFail(ZookeeperClusterNode 
failedNode) throws Exception {
+    private void handleProcessedEventsOnNodeFail(ZookeeperClusterNode 
failedNode, TreeMap<Integer, String> alives) throws Exception {
         boolean processed = false;
 
         for (Iterator<Map.Entry<Long, ZkDiscoveryEventData>> it = 
state.evtsData.evts.entrySet().iterator(); it.hasNext();) {
@@ -1351,7 +1391,7 @@ public class ZookeeperDiscoveryImpl {
         }
 
         if (processed)
-            handleProcessedEvents();
+            handleProcessedEvents("fail-" + failedNode.id(), alives);
     }
 
     /**
@@ -1362,8 +1402,14 @@ public class ZookeeperDiscoveryImpl {
         if (log.isInfoEnabled())
             log.info("All nodes processed node join [evtData=" + evtData + 
']');
 
-        
state.zkClient.deleteIfExists(zkPaths.joinEventDataPath(evtData.eventId()), -1);
-        
state.zkClient.deleteIfExists(zkPaths.joinEventDataPathForJoined(evtData.eventId()),
 -1);
+        String evtDataPath = zkPaths.joinEventDataPath(evtData.eventId());
+        String dataForJoinedPath = 
zkPaths.joinEventDataPathForJoined(evtData.eventId());
+
+        if (log.isInfoEnabled())
+            log.info("Delete processed event data [path1=" + evtDataPath + ", 
path2=" + dataForJoinedPath + ']');
+
+        state.zkClient.deleteIfExists(evtDataPath, -1);
+        state.zkClient.deleteIfExists(dataForJoinedPath, -1);
     }
 
     /**
@@ -1371,22 +1417,31 @@ public class ZookeeperDiscoveryImpl {
      * @throws Exception If failed.
      * @return Ack message.
      */
-    @Nullable private DiscoverySpiCustomMessage 
handleProcessedCustomEvent(ZkDiscoveryCustomEventData evtData)
+    @Nullable private DiscoverySpiCustomMessage 
handleProcessedCustomEvent(String ctx, ZkDiscoveryCustomEventData evtData)
         throws Exception
     {
         if (log.isInfoEnabled())
-            log.info("All nodes processed custom event [evtData=" + evtData + 
']');
+            log.info("All nodes processed custom event [ctx=" + ctx + ", 
evtData=" + evtData + ']');
 
         if (!evtData.ackEvent()) {
-            state.zkClient.deleteIfExists(zkPaths.customEventDataPath(false, 
evtData.evtPath), -1);
+            String path = zkPaths.customEventDataPath(false, evtData.evtPath);
+
+            log.info("Delete path: " + path);
+
+            state.zkClient.deleteIfExists(path, -1);
 
             assert evtData.msg != null || locNode.order() > 
evtData.topologyVersion() : evtData;
 
             if (evtData.msg != null)
                 return evtData.msg.ackMessage();
         }
-        else
-            state.zkClient.deleteIfExists(zkPaths.customEventDataPath(true, 
evtData.evtPath), -1);
+        else {
+            String path = zkPaths.ackEventDataPath(evtData.eventId());
+
+            log.info("Delete path: " + path);
+
+            state.zkClient.deleteIfExists(path, -1);
+        }
 
         return null;
     }
@@ -1806,7 +1861,7 @@ public class ZookeeperDiscoveryImpl {
                 }
 
                 if (processed)
-                    handleProcessedEvents();
+                    handleProcessedEvents("ack-" + nodeInternalId, null);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/03799c9c/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index b48cbe1..ab2dc32 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -62,6 +62,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.zookeeper.ZKUtil;
 import org.apache.zookeeper.ZkTestClientCnxnSocketNIO;
 import org.apache.zookeeper.ZooKeeper;
 import org.jetbrains.annotations.Nullable;
@@ -80,6 +81,9 @@ import static 
org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET;
  */
 public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
     /** */
+    private static final String IGNITE_ZK_ROOT = "/apacheIgnite/default";
+
+    /** */
     private static final int ZK_SRVS = 3;
 
     /** */
@@ -658,7 +662,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
             assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
                 @Override public boolean apply() {
                     try {
-                        List<String> c = 
zkClient.getChildren("/apacheIgnite/default/alive");
+                        List<String> c = zkClient.getChildren(IGNITE_ZK_ROOT + 
"/alive");
 
                         for (String failedZkNode : failedZkNodes) {
                             if (c.contains(failedZkNode))
@@ -1027,6 +1031,17 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
      */
     public void testRandomTopologyChanges() throws Exception {
         randomTopologyChanges(false, false);
+
+//        ZookeeperClient zkClient = new ZookeeperClient(new JavaLogger(), 
zkCluster.getConnectString(), 10_000, null);
+//
+//        List<String> children = ZKUtil.listSubTreeBFS(zkClient.zk(), 
IGNITE_ZK_ROOT);
+//
+//        info("Children after test:");
+//
+//        for (String s : children)
+//            info(s);
+//
+//        zkClient.close();
     }
 
     /**

Reply via email to