zk

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2fc690e6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2fc690e6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2fc690e6

Branch: refs/heads/ignite-zk-join
Commit: 2fc690e64c5a75607eb4a8542cf800a8290cda3c
Parents: ec75bba
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Jan 11 17:52:53 2018 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Jan 11 17:52:53 2018 +0300

----------------------------------------------------------------------
 .../zk/internal/ZkBulkJoinContext.java          |  50 ++
 .../internal/ZkDiscoveryNodeJoinEventData.java  |  44 +-
 .../discovery/zk/internal/ZkIgnitePaths.java    |   6 +-
 .../zk/internal/ZkJoinEventDataForJoined.java   |  42 +-
 .../zk/internal/ZkJoinedNodeEvtData.java        |  79 ++++
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 469 +++++++++++--------
 .../zk/internal/ZookeeperDiscoverySpiTest.java  |  38 +-
 7 files changed, 491 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2fc690e6/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java
new file mode 100644
index 0000000..a186aed
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.internal.util.typedef.T2;
+
+/**
+ *
+ */
+class ZkBulkJoinContext {
+    /** */
+    List<T2<ZkJoinedNodeEvtData, Map<Integer, Serializable>>> nodes;
+
+    /**
+     * @param nodeEvtData Node event data.
+     * @param discoData Discovery data for node.
+     */
+    void addJoinedNode(ZkJoinedNodeEvtData nodeEvtData, Map<Integer, 
Serializable> discoData) {
+        if (nodes == null)
+            nodes = new ArrayList<>();
+
+        nodes.add(new T2<>(nodeEvtData, discoData));
+    }
+
+    /**
+     * @return Number of joined nodes.
+     */
+    int nodes() {
+        return nodes != null ? nodes.size() : 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2fc690e6/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
index ff75d22..e46d52d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.spi.discovery.zk.internal;
 
-import java.util.UUID;
+import java.util.List;
 
 /**
  *
@@ -27,53 +27,27 @@ class ZkDiscoveryNodeJoinEventData extends 
ZkDiscoveryEventData {
     private static final long serialVersionUID = 0L;
 
     /** */
-    final long joinedInternalId;
-
-    /** */
-    final UUID nodeId;
-
-    /** */
-    final int joinDataPartCnt;
+    final List<ZkJoinedNodeEvtData> joinedNodes;
 
     /** */
     final int dataForJoinedPartCnt;
 
-    /** */
-    final int secSubjPartCnt;
-
-    /** */
-    final UUID joinDataPrefixId;
-
-    /** */
-    transient ZkJoiningNodeData joiningNodeData;
-
     /**
      * @param evtId Event ID.
      * @param topVer Topology version.
-     * @param nodeId Joined node ID.
-     * @param joinedInternalId Joined node internal ID.
-     * @param joinDataPrefixId Join data unique prefix.
-     * @param joinDataPartCnt Join data part count.
+     * @param joinedNodes Joined nodes data.
      * @param dataForJoinedPartCnt Data for joined part count.
-     * @param secSubjPartCnt Security subject part count.
      */
-    ZkDiscoveryNodeJoinEventData(long evtId,
+    ZkDiscoveryNodeJoinEventData(
+        long evtId,
         long topVer,
-        UUID nodeId,
-        long joinedInternalId,
-        UUID joinDataPrefixId,
-        int joinDataPartCnt,
-        int dataForJoinedPartCnt,
-        int secSubjPartCnt)
+        List<ZkJoinedNodeEvtData> joinedNodes,
+        int dataForJoinedPartCnt)
     {
         super(evtId, ZK_EVT_NODE_JOIN, topVer);
 
-        this.nodeId = nodeId;
-        this.joinedInternalId = joinedInternalId;
-        this.joinDataPrefixId = joinDataPrefixId;
-        this.joinDataPartCnt = joinDataPartCnt;
+        this.joinedNodes = joinedNodes;
         this.dataForJoinedPartCnt = dataForJoinedPartCnt;
-        this.secSubjPartCnt = secSubjPartCnt;
     }
 
     /** {@inheritDoc} */
@@ -81,6 +55,6 @@ class ZkDiscoveryNodeJoinEventData extends 
ZkDiscoveryEventData {
         return "ZkDiscoveryNodeJoinEventData [" +
             "evtId=" + eventId() +
             ", topVer=" + topologyVersion() +
-            ", node=" + nodeId + ']';
+            ", nodes=" + joinedNodes + ']';
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2fc690e6/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index 44b247c..642183b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -314,11 +314,11 @@ class ZkIgnitePaths {
     }
 
     /**
-     * @param evtId Event ID.
+     * @param topVer Event topology version.
      * @return Event zk path.
      */
-    String joinEventSecuritySubjectPath(long evtId) {
-        return evtsPath + "/s-" + evtId;
+    String joinEventSecuritySubjectPath(long topVer) {
+        return evtsPath + "/s-" + topVer;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/2fc690e6/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
index eb24f27..e4ae4ba0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.zk.internal;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -32,28 +33,51 @@ class ZkJoinEventDataForJoined implements Serializable {
     private final List<ZookeeperClusterNode> top;
 
     /** */
-    private final Map<Integer, Serializable> discoData;
+    private final Map<Long, byte[]> discoData;
+
+    /** */
+    private final Map<Long, Long> dupDiscoData;
 
     /**
      * @param top Topology.
      * @param discoData Discovery data.
      */
-    ZkJoinEventDataForJoined(List<ZookeeperClusterNode> top, Map<Integer, 
Serializable> discoData) {
+    ZkJoinEventDataForJoined(List<ZookeeperClusterNode> top, Map<Long, byte[]> 
discoData, @Nullable Map<Long, Long> dupDiscoData) {
+        assert top != null;
+        assert discoData != null && !discoData.isEmpty();
+
         this.top = top;
         this.discoData = discoData;
+        this.dupDiscoData = dupDiscoData;
+    }
+
+    byte[] discoveryDataForNode(long nodeOrder) {
+        assert discoData != null;
+
+        byte[] dataBytes = discoData.get(nodeOrder);
+
+        if (dataBytes != null)
+            return dataBytes;
+
+        assert dupDiscoData != null;
+
+        Long dupDataNode = dupDiscoData.get(nodeOrder);
+
+        assert dupDataNode != null;
+
+        dataBytes = discoData.get(dupDataNode);
+
+        assert dataBytes != null;
+
+        return dataBytes;
     }
 
     /**
      * @return Current topology.
      */
     List<ZookeeperClusterNode> topology() {
-        return top;
-    }
+        assert top != null;
 
-    /**
-     * @return Discovery data.
-     */
-    Map<Integer, Serializable> discoveryData() {
-        return discoData;
+        return top;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2fc690e6/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java
new file mode 100644
index 0000000..8149afc
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+/**
+ *
+ */
+public class ZkJoinedNodeEvtData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    final long topVer;
+
+    /** */
+    final long joinedInternalId;
+
+    /** */
+    final UUID nodeId;
+
+    /** */
+    final int joinDataPartCnt;
+
+    /** */
+    final int secSubjPartCnt;
+
+    /** */
+    final UUID joinDataPrefixId;
+
+    /** */
+    transient ZkJoiningNodeData joiningNodeData;
+
+    /**
+     * @param topVer Topology version for node join event.
+     * @param nodeId Joined node ID.
+     * @param joinedInternalId Joined node internal ID.
+     * @param joinDataPrefixId Join data unique prefix.
+     * @param joinDataPartCnt Join data part count.
+     * @param secSubjPartCnt Security subject part count.
+     */
+    ZkJoinedNodeEvtData(
+        long topVer,
+        UUID nodeId,
+        long joinedInternalId,
+        UUID joinDataPrefixId,
+        int joinDataPartCnt,
+        int secSubjPartCnt)
+    {
+        this.topVer = topVer;
+        this.nodeId = nodeId;
+        this.joinedInternalId = joinedInternalId;
+        this.joinDataPrefixId = joinDataPrefixId;
+        this.joinDataPartCnt = joinDataPartCnt;
+        this.secSubjPartCnt = secSubjPartCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "ZkJoinedNodeData [id=" + nodeId + ", order=" + topVer + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2fc690e6/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 75363e3..20dba12 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.zk.internal;
 import java.io.ByteArrayInputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
@@ -66,6 +67,7 @@ import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -850,19 +852,11 @@ public class ZookeeperDiscoveryImpl {
      * @param zkClient Client.
      * @param basePath Base path.
      * @param partCnt Parts count.
-     * @param checkExists If {@code true} checks path exists before calling 
delete (this check added to avoid errors
-     *      in ZooKeeper log).
-     * @throws Exception If failed.
      */
-    private void deleteMultiplePartsAsync(ZookeeperClient zkClient, String 
basePath, int partCnt, boolean checkExists)
-        throws Exception
-    {
+    private void deleteMultiplePartsAsync(ZookeeperClient zkClient, String 
basePath, int partCnt) {
         for (int i = 0; i < partCnt; i++) {
             String path = multipartPathName(basePath, i);
 
-            if (checkExists && !zkClient.exists(path))
-                continue;
-
             zkClient.deleteIfExistsAsync(path);
         }
     }
@@ -1619,47 +1613,143 @@ public class ZookeeperDiscoveryImpl {
             return;
         }
 
-        for (Map.Entry<Long, String> e : alives.entrySet()) {
-            Long internalId = e.getKey();
+        generateJoinEvents(curTop, alives, MAX_NEW_EVTS);
 
-            if (!rtState.top.nodesByInternalId.containsKey(internalId)) {
-                UUID rslvFutId = 
rtState.evtsData.communicationErrorResolveFutureId();
+        if (failedNodes != null)
+            handleProcessedEventsOnNodesFail(failedNodes);
+    }
 
-                if (rslvFutId != null) {
-                    if (log.isInfoEnabled()) {
-                        log.info("Delay alive nodes change process while 
communication error resolve " +
-                            "is in progress [reqId=" + rslvFutId + ']');
-                    }
+   private void generateJoinEvents(TreeMap<Long, ZookeeperClusterNode> curTop,
+        TreeMap<Long, String> alives,
+        final int MAX_NEW_EVTS) throws Exception
+   {
+       ZkBulkJoinContext joinCtx = new ZkBulkJoinContext();
 
-                    break;
-                }
+       for (Map.Entry<Long, String> e : alives.entrySet()) {
+           Long internalId = e.getKey();
 
-                if (processJoinOnCoordinator(curTop, internalId, 
e.getValue())) {
-                    newEvts++;
+           if (!rtState.top.nodesByInternalId.containsKey(internalId)) {
+               UUID rslvFutId = 
rtState.evtsData.communicationErrorResolveFutureId();
 
-                    if (newEvts == MAX_NEW_EVTS) {
-                        saveAndProcessNewEvents();
+               if (rslvFutId != null) {
+                   if (log.isInfoEnabled()) {
+                       log.info("Delay alive nodes change process while 
communication error resolve " +
+                           "is in progress [reqId=" + rslvFutId + ']');
+                   }
 
-                        if (log.isInfoEnabled()) {
-                            log.info("Delay alive nodes change process, max 
event threshold reached [newEvts=" + newEvts +
-                                ", totalEvts=" + rtState.evtsData.evts.size() 
+ ']');
-                        }
+                   break;
+               }
 
-                        throttleNewEventsGeneration();
+               processJoinOnCoordinator(joinCtx, curTop, internalId, 
e.getValue());
 
-                        
rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, rtState.watcher, 
rtState.watcher);
+               if (joinCtx.nodes() == MAX_NEW_EVTS) {
+                   generateBulkJoinEvent(curTop, joinCtx);
 
-                        return;
-                    }
+                   if (log.isInfoEnabled()) {
+                       log.info("Delay alive nodes change process, max event 
threshold reached [" +
+                           "newEvts=" + joinCtx.nodes() +
+                           ", totalEvts=" + rtState.evtsData.evts.size() + 
']');
+                   }
+
+                   throttleNewEventsGeneration();
+
+                   rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, 
rtState.watcher, rtState.watcher);
+
+                   return;
+               }
+           }
+       }
+
+       if (joinCtx.nodes() > 0)
+           generateBulkJoinEvent(curTop, joinCtx);
+   }
+
+    private void generateBulkJoinEvent(TreeMap<Long, ZookeeperClusterNode> 
curTop, ZkBulkJoinContext joinCtx)
+        throws Exception
+    {
+        rtState.evtsData.evtIdGen++;
+
+        long evtId = rtState.evtsData.evtIdGen;
+
+        List<T2<ZkJoinedNodeEvtData, Map<Integer, Serializable>>> nodes = 
joinCtx.nodes;
+
+        assert nodes != null && nodes.size() > 0;
+
+        int nodeCnt = nodes.size();
+
+        List<ZkJoinedNodeEvtData> joinedNodes = new ArrayList<>(nodeCnt);
+
+        Map<Long, byte[]> discoDataMap = U.newHashMap(nodeCnt);
+        Map<Long, Long> dupDiscoData = null;
+
+        for (int i = 0; i < nodeCnt; i++) {
+            T2<ZkJoinedNodeEvtData, Map<Integer, Serializable>> nodeEvtData = 
nodes.get(i);
+
+            Map<Integer, Serializable> discoData = nodeEvtData.get2();
+
+            byte[] discoDataBytes = U.marshal(marsh, discoData);
+
+            Long dupDataNode = null;
+
+            for (Map.Entry<Long, byte[]> e : discoDataMap.entrySet()) {
+                if (Arrays.equals(discoDataBytes, e.getValue())) {
+                    dupDataNode = e.getKey();
+
+                    break;
                 }
             }
+
+            long nodeTopVer = nodeEvtData.get1().topVer;
+
+            if (dupDataNode != null) {
+                if (dupDiscoData == null)
+                    dupDiscoData = new HashMap<>();
+
+                Long old = dupDiscoData.put(nodeTopVer, dupDataNode);
+
+                assert old == null : old;
+            }
+            else
+                discoDataMap.put(nodeTopVer, discoDataBytes);
+
+            joinedNodes.add(nodeEvtData.get1());
         }
 
-        if (newEvts > 0)
-            saveAndProcessNewEvents();
+        int overhead = 5;
 
-        if (failedNodes != null)
-            handleProcessedEventsOnNodesFail(failedNodes);
+        ZkJoinEventDataForJoined dataForJoined = new ZkJoinEventDataForJoined(
+            new ArrayList<>(curTop.values()),
+            discoDataMap,
+            dupDiscoData);
+
+        byte[] dataForJoinedBytes = marshalZip(dataForJoined);
+
+        long addDataStart = System.currentTimeMillis();
+
+        int dataForJoinedPartCnt = 
saveData(zkPaths.joinEventDataPathForJoined(evtId),
+            dataForJoinedBytes,
+            overhead);
+
+        long addDataTime = System.currentTimeMillis() - addDataStart;
+
+        ZkDiscoveryNodeJoinEventData evtData = new 
ZkDiscoveryNodeJoinEventData(
+            evtId,
+            rtState.evtsData.topVer,
+            joinedNodes,
+            dataForJoinedPartCnt);
+
+        rtState.evtsData.addEvent(curTop.values(), evtData);
+
+        if (log.isInfoEnabled()) {
+            log.info("Generated NODE_JOINED event [" +
+                "nodeCnt=" + nodeCnt +
+                ", dataForJoinedSize=" + dataForJoinedBytes.length +
+                ", dataForJoinedPartCnt=" + dataForJoinedPartCnt +
+                ", addDataTime=" + addDataTime +
+                ", evt=" + evtData + ']');
+        }
+
+        saveAndProcessNewEvents();
     }
 
     /**
@@ -1766,6 +1856,7 @@ public class ZookeeperDiscoveryImpl {
      * @return {@code True} if new join event was added.
      */
     private boolean processJoinOnCoordinator(
+        ZkBulkJoinContext joinCtx,
         TreeMap<Long, ZookeeperClusterNode> curTop,
         long internalId,
         String aliveNodePath)
@@ -1786,7 +1877,9 @@ public class ZookeeperDiscoveryImpl {
 
                 assert nodeId.equals(joinedNode.id()) : joiningNodeData.node();
 
-                generateNodeJoin(curTop,
+                addJoinedNode(
+                    joinCtx,
+                    curTop,
                     joiningNodeData,
                     internalId,
                     prefixId,
@@ -1985,7 +2078,8 @@ public class ZookeeperDiscoveryImpl {
      * @param secSubjZipBytes Marshalled security subject.
      * @throws Exception If failed.
      */
-    private void generateNodeJoin(
+    private void addJoinedNode(
+        ZkBulkJoinContext joinCtx,
         TreeMap<Long, ZookeeperClusterNode> curTop,
         ZkJoiningNodeData joiningNodeData,
         long internalId,
@@ -1998,13 +2092,10 @@ public class ZookeeperDiscoveryImpl {
         UUID nodeId = joinedNode.id();
 
         rtState.evtsData.topVer++;
-        rtState.evtsData.evtIdGen++;
 
         joinedNode.order(rtState.evtsData.topVer);
         joinedNode.internalId(internalId);
 
-        long evtId = rtState.evtsData.evtIdGen;
-
         DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(nodeId);
 
         joiningNodeBag.joiningNodeData(joiningNodeData.discoveryData());
@@ -2017,60 +2108,37 @@ public class ZookeeperDiscoveryImpl {
 
         Map<Integer, Serializable> commonData = collectBag.commonData();
 
-        ZkJoinEventDataForJoined dataForJoined = new ZkJoinEventDataForJoined(
-            new ArrayList<>(curTop.values()),
-            commonData);
-
         Object old = curTop.put(joinedNode.order(), joinedNode);
 
         assert old == null;
 
-        long addDataStart = System.currentTimeMillis();
-
-        byte[] dataForJoinedBytes = marshalZip(dataForJoined);
-
         int overhead = 5;
 
-        int dataForJoinedPartCnt = 
saveData(zkPaths.joinEventDataPathForJoined(evtId),
-            dataForJoinedBytes,
-            overhead);
-
         int secSubjPartCnt = 0;
 
         if (secSubjZipBytes != null) {
-            secSubjPartCnt = 
saveData(zkPaths.joinEventSecuritySubjectPath(evtId), secSubjZipBytes, 
overhead);
+            secSubjPartCnt = 
saveData(zkPaths.joinEventSecuritySubjectPath(joinedNode.order()),
+                secSubjZipBytes,
+                overhead);
 
             assert secSubjPartCnt > 0 : secSubjPartCnt;
 
             setNodeSecuritySubject(joinedNode, secSubjZipBytes);
         }
 
-        long addDataTime = System.currentTimeMillis() - addDataStart;
-
-        ZkDiscoveryNodeJoinEventData evtData = new 
ZkDiscoveryNodeJoinEventData(
-            evtId,
+        ZkJoinedNodeEvtData nodeEvtData = new ZkJoinedNodeEvtData(
             rtState.evtsData.topVer,
             joinedNode.id(),
             joinedNode.internalId(),
             prefixId,
             joiningNodeData.partCount(),
-            dataForJoinedPartCnt,
             secSubjPartCnt);
 
-        rtState.evtsData.onNodeJoin(joinedNode);
-
-        evtData.joiningNodeData = joiningNodeData;
-
-        rtState.evtsData.addEvent(dataForJoined.topology(), evtData);
+        nodeEvtData.joiningNodeData = joiningNodeData;
 
-        evtData.addRemainingAck(joinedNode); // Topology for joined node does 
not contain joined node.
+        joinCtx.addJoinedNode(nodeEvtData, commonData);
 
-        if (log.isInfoEnabled()) {
-            log.info("Generated NODE_JOINED event [evt=" + evtData +
-                ", dataForJoinedSize=" + dataForJoinedBytes.length +
-                ", dataForJoinedPartCnt=" + dataForJoinedPartCnt +
-                ", addDataTime=" + addDataTime + ']');
-        }
+        rtState.evtsData.onNodeJoin(joinedNode);
     }
 
     /**
@@ -2509,116 +2577,82 @@ public class ZookeeperDiscoveryImpl {
 
         try {
             for (ZkDiscoveryEventData evtData : 
evts.tailMap(rtState.locNodeInfo.lastProcEvt, false).values()) {
-                if (!rtState.joined) {
-                    if (evtData.eventType() != 
ZkDiscoveryEventData.ZK_EVT_NODE_JOIN)
-                        continue;
-
-                    ZkDiscoveryNodeJoinEventData evtData0 = 
(ZkDiscoveryNodeJoinEventData)evtData;
-
-                    UUID joinedId = evtData0.nodeId;
-
-                    boolean locJoin = evtData0.joinedInternalId == 
rtState.internalOrder;
-
-                    if (locJoin) {
-                        assert locNode.id().equals(joinedId);
+                if (log.isDebugEnabled())
+                    log.debug("New discovery event data [evt=" + evtData + ", 
evtsHist=" + evts.size() + ']');
 
-                        processLocalJoin(evtsData, evtData0);
+                switch (evtData.eventType()) {
+                    case ZkDiscoveryEventData.ZK_EVT_NODE_JOIN: {
+                        evtProcessed = processBulkJoin(evtsData, 
(ZkDiscoveryNodeJoinEventData)evtData);
 
-                        evtProcessed = true;
+                        break;
                     }
-                }
-                else {
-                    if (log.isDebugEnabled())
-                        log.debug("New discovery event data [evt=" + evtData + 
", evtsHist=" + evts.size() + ']');
-
-                    switch (evtData.eventType()) {
-                        case ZkDiscoveryEventData.ZK_EVT_NODE_JOIN: {
-                            ZkDiscoveryNodeJoinEventData evtData0 = 
(ZkDiscoveryNodeJoinEventData)evtData;
-
-                            ZkJoiningNodeData joiningData;
-
-                            if (rtState.crd) {
-                                assert evtData0.joiningNodeData != null;
 
-                                joiningData = evtData0.joiningNodeData;
-                            }
-                            else {
-                                joiningData = 
unmarshalJoinData(evtData0.nodeId, evtData0.joinDataPrefixId);
-
-                                DiscoveryDataBag dataBag = new 
DiscoveryDataBag(evtData0.nodeId);
-
-                                
dataBag.joiningNodeData(joiningData.discoveryData());
+                    case ZkDiscoveryEventData.ZK_EVT_NODE_FAILED: {
+                        if (!rtState.joined)
+                            break;
 
-                                exchange.onExchange(dataBag);
-                            }
+                        evtProcessed = true;
 
-                            if (evtData0.secSubjPartCnt > 0 && 
joiningData.node().attribute(ATTR_SECURITY_SUBJECT_V2) == null)
-                                readAndInitSecuritySubject(joiningData.node(), 
evtData0);
+                        notifyNodeFail((ZkDiscoveryNodeFailEventData)evtData);
 
-                            notifyNodeJoin(evtData0, joiningData);
+                        break;
+                    }
 
+                    case ZkDiscoveryEventData.ZK_EVT_CUSTOM_EVT: {
+                        if (!rtState.joined)
                             break;
-                        }
-
-                        case ZkDiscoveryEventData.ZK_EVT_NODE_FAILED: {
-                            
notifyNodeFail((ZkDiscoveryNodeFailEventData)evtData);
 
-                            break;
-                        }
+                        evtProcessed = true;
 
-                        case ZkDiscoveryEventData.ZK_EVT_CUSTOM_EVT: {
-                            ZkDiscoveryCustomEventData evtData0 = 
(ZkDiscoveryCustomEventData)evtData;
+                        ZkDiscoveryCustomEventData evtData0 = 
(ZkDiscoveryCustomEventData)evtData;
 
-                            if (evtData0.ackEvent() && 
evtData0.topologyVersion() < locNode.order())
-                                break;
+                        if (evtData0.ackEvent() && evtData0.topologyVersion() 
< locNode.order())
+                            break;
 
-                            DiscoverySpiCustomMessage msg;
+                        DiscoverySpiCustomMessage msg;
 
-                            if (rtState.crd) {
-                                assert evtData0.resolvedMsg != null : evtData0;
+                        if (rtState.crd) {
+                            assert evtData0.resolvedMsg != null : evtData0;
 
-                                msg = evtData0.resolvedMsg;
-                            }
-                            else {
-                                if (evtData0.msg == null) {
-                                    if (evtData0.ackEvent()) {
-                                        String path = 
zkPaths.ackEventDataPath(evtData0.origEvtId);
+                            msg = evtData0.resolvedMsg;
+                        }
+                        else {
+                            if (evtData0.msg == null) {
+                                if (evtData0.ackEvent()) {
+                                    String path = 
zkPaths.ackEventDataPath(evtData0.origEvtId);
 
-                                        msg = 
unmarshalZip(zkClient.getData(path));
-                                    }
-                                    else {
-                                        assert evtData0.evtPath != null : 
evtData0;
+                                    msg = unmarshalZip(zkClient.getData(path));
+                                }
+                                else {
+                                    assert evtData0.evtPath != null : evtData0;
 
-                                        byte[] msgBytes = 
readCustomEventData(zkClient,
-                                            evtData0.evtPath,
-                                            evtData0.sndNodeId);
+                                    byte[] msgBytes = 
readCustomEventData(zkClient,
+                                        evtData0.evtPath,
+                                        evtData0.sndNodeId);
 
-                                        msg = unmarshalZip(msgBytes);
-                                    }
+                                    msg = unmarshalZip(msgBytes);
                                 }
-                                else
-                                    msg = evtData0.msg;
-
-                                evtData0.resolvedMsg = msg;
                             }
+                            else
+                                msg = evtData0.msg;
 
-                            if (msg instanceof ZkInternalMessage)
-                                processInternalMessage(evtData0, 
(ZkInternalMessage)msg);
-                            else {
-                                notifyCustomEvent(evtData0, msg);
+                            evtData0.resolvedMsg = msg;
+                        }
 
-                                if (!evtData0.ackEvent())
-                                    updateNodeInfo = true;
-                            }
+                        if (msg instanceof ZkInternalMessage)
+                            processInternalMessage(evtData0, 
(ZkInternalMessage)msg);
+                        else {
+                            notifyCustomEvent(evtData0, msg);
 
-                            break;
+                            if (!evtData0.ackEvent())
+                                updateNodeInfo = true;
                         }
 
-                        default:
-                            assert false : "Invalid event: " + evtData;
+                        break;
                     }
 
-                    evtProcessed = true;
+                    default:
+                        assert false : "Invalid event: " + evtData;
                 }
 
                 if (rtState.joined) {
@@ -2666,6 +2700,55 @@ public class ZookeeperDiscoveryImpl {
             commErrFut.onTopologyChange(rtState.top); // This can add new 
event, notify out of event process loop.
     }
 
+    private boolean processBulkJoin(ZkDiscoveryEventsData evtsData, 
ZkDiscoveryNodeJoinEventData evtData)
+        throws Exception
+    {
+        boolean evtProcessed = false;
+
+        for (int i = 0; i < evtData.joinedNodes.size(); i++) {
+            ZkJoinedNodeEvtData joinedEvtData = evtData.joinedNodes.get(i);
+
+            if (!rtState.joined) {
+                UUID joinedId = joinedEvtData.nodeId;
+
+                boolean locJoin = joinedEvtData.joinedInternalId == 
rtState.internalOrder;
+
+                if (locJoin) {
+                    assert locNode.id().equals(joinedId);
+
+                    processLocalJoin(evtsData, joinedEvtData, evtData);
+
+                    evtProcessed = true;
+                }
+            }
+            else {
+                ZkJoiningNodeData joiningData;
+
+                if (rtState.crd) {
+                    assert joinedEvtData.joiningNodeData != null;
+
+                    joiningData = joinedEvtData.joiningNodeData;
+                }
+                else {
+                    joiningData = unmarshalJoinData(joinedEvtData.nodeId, 
joinedEvtData.joinDataPrefixId);
+
+                    DiscoveryDataBag dataBag = new 
DiscoveryDataBag(joinedEvtData.nodeId);
+
+                    dataBag.joiningNodeData(joiningData.discoveryData());
+
+                    exchange.onExchange(dataBag);
+                }
+
+                if (joinedEvtData.secSubjPartCnt > 0 && 
joiningData.node().attribute(ATTR_SECURITY_SUBJECT_V2) == null)
+                    readAndInitSecuritySubject(joiningData.node(), 
joinedEvtData);
+
+                notifyNodeJoin(joinedEvtData, joiningData);
+            }
+        }
+
+        return evtProcessed;
+    }
+
     /**
      * @param rtState Runtime state.
      * @param updateNodeInfo {@code True} if need update processed events 
without delay.
@@ -2745,14 +2828,14 @@ public class ZookeeperDiscoveryImpl {
 
     /**
      * @param node Node.
-     * @param evtData Node join event data.
+     * @param joinedEvtData Joined node information.
      * @throws Exception If failed.
      */
-    private void readAndInitSecuritySubject(ZookeeperClusterNode node, 
ZkDiscoveryNodeJoinEventData evtData) throws Exception {
-        if (evtData.secSubjPartCnt > 0) {
+    private void readAndInitSecuritySubject(ZookeeperClusterNode node, 
ZkJoinedNodeEvtData joinedEvtData) throws Exception {
+        if (joinedEvtData.secSubjPartCnt > 0) {
             byte[] zipBytes = readMultipleParts(rtState.zkClient,
-                zkPaths.joinEventSecuritySubjectPath(evtData.eventId()),
-                evtData.secSubjPartCnt);
+                zkPaths.joinEventSecuritySubjectPath(joinedEvtData.topVer),
+                joinedEvtData.secSubjPartCnt);
 
             setNodeSecuritySubject(node, zipBytes);
         }
@@ -2764,7 +2847,9 @@ public class ZookeeperDiscoveryImpl {
      * @throws Exception If failed.
      */
     @SuppressWarnings("unchecked")
-    private void processLocalJoin(ZkDiscoveryEventsData evtsData, final 
ZkDiscoveryNodeJoinEventData evtData)
+    private void processLocalJoin(ZkDiscoveryEventsData evtsData,
+        ZkJoinedNodeEvtData joinedEvtData,
+        ZkDiscoveryNodeJoinEventData evtData)
         throws Exception
     {
         synchronized (stateMux) {
@@ -2781,7 +2866,7 @@ public class ZookeeperDiscoveryImpl {
             spi.getSpiContext().removeTimeoutObject(rtState.joinErrTo);
 
             if (log.isInfoEnabled())
-                log.info("Local join event data: " + evtData + ']');
+                log.info("Local join event data: " + joinedEvtData + ']');
 
             String path = 
zkPaths.joinEventDataPathForJoined(evtData.eventId());
 
@@ -2791,14 +2876,19 @@ public class ZookeeperDiscoveryImpl {
 
             rtState.gridStartTime = evtsData.clusterStartTime;
 
-            locNode.internalId(evtData.joinedInternalId);
-            locNode.order(evtData.topologyVersion());
+            locNode.internalId(joinedEvtData.joinedInternalId);
+            locNode.order(joinedEvtData.topVer);
 
-            readAndInitSecuritySubject(locNode, evtData);
+            readAndInitSecuritySubject(locNode, joinedEvtData);
+
+            byte[] discoDataBytes = 
dataForJoined.discoveryDataForNode(locNode.order());
+
+            Map<Integer, Serializable> commonDiscoData =
+                marsh.unmarshal(discoDataBytes, 
U.resolveClassLoader(spi.ignite().configuration()));
 
             DiscoveryDataBag dataBag = new DiscoveryDataBag(locNode.id());
 
-            dataBag.commonData(dataForJoined.discoveryData());
+            dataBag.commonData(commonDiscoData);
 
             exchange.onExchange(dataBag);
 
@@ -2807,6 +2897,10 @@ public class ZookeeperDiscoveryImpl {
             for (int i = 0; i < allNodes.size(); i++) {
                 ZookeeperClusterNode node = allNodes.get(i);
 
+                // Need filter since ZkJoinEventDataForJoined contains single 
topology snapshot for all joined nodes.
+                if (node.order() >= locNode.order())
+                    break;
+
                 node.setMetrics(new ClusterMetricsSnapshot());
 
                 rtState.top.addNode(node);
@@ -2817,7 +2911,7 @@ public class ZookeeperDiscoveryImpl {
             final List<ClusterNode> topSnapshot = 
rtState.top.topologySnapshot();
 
             lsnr.onDiscovery(EVT_NODE_JOINED,
-                evtData.topologyVersion(),
+                joinedEvtData.topVer,
                 locNode,
                 topSnapshot,
                 Collections.<Long, Collection<ClusterNode>>emptyMap(),
@@ -2825,7 +2919,7 @@ public class ZookeeperDiscoveryImpl {
 
             if (rtState.prevJoined) {
                 lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
-                    evtData.topologyVersion(),
+                    joinedEvtData.topVer,
                     locNode,
                     topSnapshot,
                     Collections.<Long, Collection<ClusterNode>>emptyMap(),
@@ -2839,8 +2933,6 @@ public class ZookeeperDiscoveryImpl {
 
         joinFut.onDone();
 
-        deleteDataForJoinedAsync(evtData);
-
         if (locNode.isClient())
             rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new 
CheckClientsStatusCallback(rtState));
     }
@@ -3302,15 +3394,15 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
-     * @param evtData Event data.
+     * @param joinedEvtData Event data.
      * @param joiningData Joining node data.
      */
     @SuppressWarnings("unchecked")
-    private void notifyNodeJoin(final ZkDiscoveryNodeJoinEventData evtData, 
ZkJoiningNodeData joiningData) {
+    private void notifyNodeJoin(ZkJoinedNodeEvtData joinedEvtData, 
ZkJoiningNodeData joiningData) {
         final ZookeeperClusterNode joinedNode = joiningData.node();
 
-        joinedNode.order(evtData.topologyVersion());
-        joinedNode.internalId(evtData.joinedInternalId);
+        joinedNode.order(joinedEvtData.topVer);
+        joinedNode.internalId(joinedEvtData.joinedInternalId);
 
         joinedNode.setMetrics(new ClusterMetricsSnapshot());
 
@@ -3319,7 +3411,7 @@ public class ZookeeperDiscoveryImpl {
         final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
 
         lsnr.onDiscovery(EVT_NODE_JOINED,
-            evtData.topologyVersion(),
+            joinedEvtData.topVer,
             joinedNode,
             topSnapshot,
             Collections.<Long, Collection<ClusterNode>>emptyMap(),
@@ -3571,25 +3663,27 @@ public class ZookeeperDiscoveryImpl {
         if (log.isDebugEnabled())
             log.debug("All nodes processed node join [evtData=" + evtData + 
']');
 
-        deleteJoiningNodeData(evtData.nodeId, evtData.joinDataPrefixId, 
evtData.joinDataPartCnt);
+        for (int i = 0; i < evtData.joinedNodes.size(); i++) {
+            ZkJoinedNodeEvtData joinedEvtData = evtData.joinedNodes.get(i);
 
-        deleteDataForJoinedAsync(evtData);
+            deleteJoiningNodeData(joinedEvtData.nodeId, 
joinedEvtData.joinDataPrefixId, joinedEvtData.joinDataPartCnt);
 
-        if (evtData.secSubjPartCnt > 0) {
-            deleteMultiplePartsAsync(rtState.zkClient,
-                zkPaths.joinEventSecuritySubjectPath(evtData.eventId()),
-                evtData.secSubjPartCnt,
-                false);
+            if (joinedEvtData.secSubjPartCnt > 0) {
+                deleteMultiplePartsAsync(rtState.zkClient,
+                    zkPaths.joinEventSecuritySubjectPath(evtData.eventId()),
+                    joinedEvtData.secSubjPartCnt);
+            }
         }
+
+        deleteDataForJoinedAsync(evtData);
     }
 
     /**
      * @param nodeId Node ID.
      * @param joinDataPrefixId Path prefix.
      * @param partCnt Parts count.
-     * @throws Exception If failed.
      */
-    private void deleteJoiningNodeData(UUID nodeId, UUID joinDataPrefixId, int 
partCnt) throws Exception {
+    private void deleteJoiningNodeData(UUID nodeId, UUID joinDataPrefixId, int 
partCnt) {
         String evtDataPath = zkPaths.joiningNodeDataPath(nodeId, 
joinDataPrefixId);
 
         if (log.isDebugEnabled())
@@ -3598,20 +3692,19 @@ public class ZookeeperDiscoveryImpl {
         rtState.zkClient.deleteIfExistsAsync(evtDataPath);
 
         if (partCnt > 1)
-            deleteMultiplePartsAsync(rtState.zkClient, evtDataPath + ":", 
partCnt, true);
+            deleteMultiplePartsAsync(rtState.zkClient, evtDataPath + ":", 
partCnt);
     }
 
     /**
      * @param evtData Event data.
-     * @throws Exception If failed.
      */
-    private void deleteDataForJoinedAsync(ZkDiscoveryNodeJoinEventData 
evtData) throws Exception {
+    private void deleteDataForJoinedAsync(ZkDiscoveryNodeJoinEventData 
evtData) {
         String dataForJoinedPath = 
zkPaths.joinEventDataPathForJoined(evtData.eventId());
 
         if (log.isDebugEnabled())
             log.debug("Delete data for joined node [path=" + dataForJoinedPath 
+ ']');
 
-        deleteMultiplePartsAsync(rtState.zkClient, dataForJoinedPath, 
evtData.dataForJoinedPartCnt, true);
+        deleteMultiplePartsAsync(rtState.zkClient, dataForJoinedPath, 
evtData.dataForJoinedPartCnt);
     }
 
     /**
@@ -3800,7 +3893,7 @@ public class ZookeeperDiscoveryImpl {
     byte[] marshalZip(Object obj) throws IgniteCheckedException {
         assert obj != null;
 
-        return zip(marsh.marshal(obj));
+        return zip(U.marshal(marsh, obj));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/2fc690e6/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
index 75ecb8c..273200a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
@@ -380,8 +380,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
                     throw new IgniteException("Failed to create directory for 
test Zookeeper server: " + file.getAbsolutePath());
             }
 
-
-            specs.add(new InstanceSpec(file, -1, -1, -1, true, -1, 1000, -1));
+            specs.add(new InstanceSpec(file, -1, -1, -1, true, -1, 1000, 
10_000));
         }
 
         return new TestingCluster(specs);
@@ -1471,6 +1470,41 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testConcurrentStart() throws Exception {
+        final int NODES = 20;
+
+        for (int i = 0; i < 3; i++) {
+            info("Iteration: " + i);
+
+            final AtomicInteger idx = new AtomicInteger();
+
+            final CyclicBarrier b = new CyclicBarrier(NODES);
+
+            GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    b.await();
+
+                    int threadIdx = idx.getAndIncrement();
+
+                    startGrid(threadIdx);
+
+                    return null;
+                }
+            }, NODES, "start-node");
+
+            waitForTopology(NODES);
+
+            stopAllGrids();
+
+            checkEventsConsistency();
+
+            evts.clear();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testConcurrentStartStop1() throws Exception {
        concurrentStartStop(1);
     }

Reply via email to