Repository: ignite
Updated Branches:
  refs/heads/ignite-zk b361a7802 -> 804c84171


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/804c8417
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/804c8417
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/804c8417

Branch: refs/heads/ignite-zk
Commit: 804c84171cc75e53bed549d13d5af6858786d9a7
Parents: b361a78
Author: sboikov <[email protected]>
Authored: Mon Nov 13 15:25:25 2017 +0300
Committer: sboikov <[email protected]>
Committed: Mon Nov 13 15:25:25 2017 +0300

----------------------------------------------------------------------
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java |  64 +++++-----
 .../zk/ZookeeperDiscoverySpiBasicTest.java      | 116 +++++++++++++++++++
 2 files changed, 149 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/804c8417/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index 85b0aa5..4059b0b 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -849,53 +849,55 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
                         assert lastEvt == null || lastEvt.topVer + 1 == 
e.topVer : "lastEvt=" + lastEvt + ", nextEvt=" + e;
 
                         if (!crd) {
-                            if (locJoin) {
-                                for (ZookeeperClusterNode node : e.allNodes) {
-                                    assert node.order() > 0 : node;
+                            synchronized (curTop) {
+                                if (locJoin) {
+                                    for (ZookeeperClusterNode node : 
e.allNodes) {
+                                        assert node.order() > 0 : node;
 
-                                    Object old = curTop.put(node.order(), 
node);
+                                        Object old = curTop.put(node.order(), 
node);
 
-                                    assert old == null : node;
-                                }
+                                        assert old == null : node;
+                                    }
 
-                                DiscoveryDataBag dataBag = new 
DiscoveryDataBag(e.node.id());
+                                    DiscoveryDataBag dataBag = new 
DiscoveryDataBag(e.node.id());
 
-                                dataBag.joiningNodeData(e.joiningNodeData);
-                                dataBag.commonData(e.commonData);
+                                    dataBag.joiningNodeData(e.joiningNodeData);
+                                    dataBag.commonData(e.commonData);
 
-                                exchange.onExchange(dataBag);
-                            }
-                            else {
-                                switch (e.evtType) {
-                                    case EventType.EVT_NODE_JOINED: {
-                                        ZookeeperClusterNode node = e.node;
+                                    exchange.onExchange(dataBag);
+                                }
+                                else {
+                                    switch (e.evtType) {
+                                        case EventType.EVT_NODE_JOINED: {
+                                            ZookeeperClusterNode node = e.node;
 
-                                        DiscoveryDataBag dataBag = new 
DiscoveryDataBag(e.node.id());
+                                            DiscoveryDataBag dataBag = new 
DiscoveryDataBag(e.node.id());
 
-                                        
dataBag.joiningNodeData(e.joiningNodeData);
-                                        dataBag.commonData(e.commonData);
+                                            
dataBag.joiningNodeData(e.joiningNodeData);
+                                            dataBag.commonData(e.commonData);
 
-                                        exchange.onExchange(dataBag);
+                                            exchange.onExchange(dataBag);
 
-                                        Object old = curTop.put(node.order(), 
node);
+                                            Object old = 
curTop.put(node.order(), node);
 
-                                        assert old == null : node;
+                                            assert old == null : node;
 
-                                        break;
-                                    }
+                                            break;
+                                        }
 
-                                    case EventType.EVT_NODE_FAILED: {
-                                        ZookeeperClusterNode node = e.node;
+                                        case EventType.EVT_NODE_FAILED: {
+                                            ZookeeperClusterNode node = e.node;
 
-                                        Object failedNode = 
curTop.remove(node.order());
+                                            Object failedNode = 
curTop.remove(node.order());
 
-                                        assert failedNode != null : node;
+                                            assert failedNode != null : node;
 
-                                        break;
-                                    }
+                                            break;
+                                        }
 
-                                    default:
-                                        assert false : e;
+                                        default:
+                                            assert false : e;
+                                    }
                                 }
                             }
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/804c8417/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
index 5e1a200..f32a7e6 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
@@ -17,21 +17,36 @@
 
 package org.apache.ignite.spi.discovery.zk;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.curator.test.TestingCluster;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+
 /**
  *
  */
@@ -45,6 +60,12 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     /** */
     private boolean client;
 
+    /** */
+    private static ConcurrentHashMap<UUID, Map<Long, DiscoveryEvent>> evts = 
new ConcurrentHashMap<>();
+
+    /** */
+    private static volatile boolean err;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -73,6 +94,47 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
 
         cfg.setClientMode(client);
 
+        Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>();
+
+        lsnrs.put(new IgnitePredicate<Event>() {
+            /** */
+            @IgniteInstanceResource
+            private Ignite ignite;
+
+            @Override public boolean apply(Event evt) {
+                try {
+                    DiscoveryEvent discoveryEvt = (DiscoveryEvent)evt;
+
+                    UUID locId = ignite.cluster().localNode().id();
+
+                    Map<Long, DiscoveryEvent> nodeEvts = evts.get(locId);
+
+                    if (nodeEvts == null) {
+                        Object old = evts.put(locId, nodeEvts = new 
TreeMap<>());
+
+                        assertNull(old);
+
+                        DiscoveryLocalJoinData locJoin = 
((IgniteKernal)ignite).context().discovery().localJoin();
+
+                        nodeEvts.put(locJoin.event().topologyVersion(), 
locJoin.event());
+                    }
+
+                    DiscoveryEvent old = 
nodeEvts.put(discoveryEvt.topologyVersion(), discoveryEvt);
+
+                    assertNull(old);
+                }
+                catch (Throwable e) {
+                    err = true;
+
+                    info("Unexpected error: " + e);
+                }
+
+                return true;
+            }
+        }, new int[]{EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT});
+
+        cfg.setLocalEventListeners(lsnrs);
+
         return cfg;
     }
 
@@ -80,6 +142,10 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 
+        err = false;
+
+        evts.clear();
+
         if (USE_TEST_CLUSTER) {
             zkCluster = new TestingCluster(1);
             zkCluster.start();
@@ -96,6 +162,54 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         }
 
         super.afterTest();
+
+        assertFalse("Unexpected error, see log for details", err);
+
+        checkEventsConsistency();
+
+        evts.clear();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkEventsConsistency() throws Exception {
+        for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry : 
evts.entrySet()) {
+            UUID nodeId = nodeEvtEntry.getKey();
+            Map<Long, DiscoveryEvent> nodeEvts = nodeEvtEntry.getValue();
+
+            for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry0 : 
evts.entrySet()) {
+                if (!nodeId.equals(nodeEvtEntry0.getKey())) {
+                    Map<Long, DiscoveryEvent> nodeEvts0 = 
nodeEvtEntry0.getValue();
+
+                    checkEventsConsistency(nodeEvts, nodeEvts0);
+                }
+            }
+        }
+    }
+
+    /**
+     * @param evts1 Received events.
+     * @param evts2 Received events.
+     */
+    private void checkEventsConsistency(Map<Long, DiscoveryEvent> evts1, 
Map<Long, DiscoveryEvent> evts2) {
+        for (Map.Entry<Long, DiscoveryEvent> e1 : evts1.entrySet()) {
+            DiscoveryEvent evt1 = e1.getValue();
+            DiscoveryEvent evt2 = evts2.get(e1.getKey());
+
+            if (evt2 != null) {
+                assertEquals(evt1.topologyVersion(), evt2.topologyVersion());
+                assertEquals(evt1.eventNode(), evt2.eventNode());
+                assertEquals(evt1.topologyNodes(), evt2.topologyNodes());
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore1() throws Exception {
+
     }
 
     /**
@@ -224,6 +338,8 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
             }, THREADS, "stop-node");
 
             waitForTopology(SRVS);
+
+            checkEventsConsistency();
         }
     }
 

Reply via email to