zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2fc690e6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2fc690e6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2fc690e6 Branch: refs/heads/ignite-zk-join Commit: 2fc690e64c5a75607eb4a8542cf800a8290cda3c Parents: ec75bba Author: sboikov <sboi...@gridgain.com> Authored: Thu Jan 11 17:52:53 2018 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Jan 11 17:52:53 2018 +0300 ---------------------------------------------------------------------- .../zk/internal/ZkBulkJoinContext.java | 50 ++ .../internal/ZkDiscoveryNodeJoinEventData.java | 44 +- .../discovery/zk/internal/ZkIgnitePaths.java | 6 +- .../zk/internal/ZkJoinEventDataForJoined.java | 42 +- .../zk/internal/ZkJoinedNodeEvtData.java | 79 ++++ .../zk/internal/ZookeeperDiscoveryImpl.java | 469 +++++++++++-------- .../zk/internal/ZookeeperDiscoverySpiTest.java | 38 +- 7 files changed, 491 insertions(+), 237 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2fc690e6/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java new file mode 100644 index 0000000..a186aed --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.ignite.internal.util.typedef.T2; + +/** + * + */ +class ZkBulkJoinContext { + /** */ + List<T2<ZkJoinedNodeEvtData, Map<Integer, Serializable>>> nodes; + + /** + * @param nodeEvtData Node event data. + * @param discoData Discovery data for node. + */ + void addJoinedNode(ZkJoinedNodeEvtData nodeEvtData, Map<Integer, Serializable> discoData) { + if (nodes == null) + nodes = new ArrayList<>(); + + nodes.add(new T2<>(nodeEvtData, discoData)); + } + + /** + * @return Number of joined nodes. + */ + int nodes() { + return nodes != null ? nodes.size() : 0; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2fc690e6/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java index ff75d22..e46d52d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java @@ -17,7 +17,7 @@ package org.apache.ignite.spi.discovery.zk.internal; -import java.util.UUID; +import java.util.List; /** * @@ -27,53 +27,27 @@ class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData { private static final long serialVersionUID = 0L; /** */ - final long joinedInternalId; - - /** */ - final UUID nodeId; - - /** */ - final int joinDataPartCnt; + final List<ZkJoinedNodeEvtData> joinedNodes; /** */ final int dataForJoinedPartCnt; - /** */ - final int secSubjPartCnt; - - /** */ - final UUID joinDataPrefixId; - - /** */ - transient ZkJoiningNodeData joiningNodeData; - /** * @param evtId Event ID. * @param topVer Topology version. - * @param nodeId Joined node ID. - * @param joinedInternalId Joined node internal ID. - * @param joinDataPrefixId Join data unique prefix. - * @param joinDataPartCnt Join data part count. + * @param joinedNodes Joined nodes data. * @param dataForJoinedPartCnt Data for joined part count. - * @param secSubjPartCnt Security subject part count. */ - ZkDiscoveryNodeJoinEventData(long evtId, + ZkDiscoveryNodeJoinEventData( + long evtId, long topVer, - UUID nodeId, - long joinedInternalId, - UUID joinDataPrefixId, - int joinDataPartCnt, - int dataForJoinedPartCnt, - int secSubjPartCnt) + List<ZkJoinedNodeEvtData> joinedNodes, + int dataForJoinedPartCnt) { super(evtId, ZK_EVT_NODE_JOIN, topVer); - this.nodeId = nodeId; - this.joinedInternalId = joinedInternalId; - this.joinDataPrefixId = joinDataPrefixId; - this.joinDataPartCnt = joinDataPartCnt; + this.joinedNodes = joinedNodes; this.dataForJoinedPartCnt = dataForJoinedPartCnt; - this.secSubjPartCnt = secSubjPartCnt; } /** {@inheritDoc} */ @@ -81,6 +55,6 @@ class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData { return "ZkDiscoveryNodeJoinEventData [" + "evtId=" + eventId() + ", topVer=" + topologyVersion() + - ", node=" + nodeId + ']'; + ", nodes=" + joinedNodes + ']'; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2fc690e6/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java index 44b247c..642183b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java @@ -314,11 +314,11 @@ class ZkIgnitePaths { } /** - * @param evtId Event ID. + * @param topVer Event topology version. * @return Event zk path. */ - String joinEventSecuritySubjectPath(long evtId) { - return evtsPath + "/s-" + evtId; + String joinEventSecuritySubjectPath(long topVer) { + return evtsPath + "/s-" + topVer; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/2fc690e6/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java index eb24f27..e4ae4ba0 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java @@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.zk.internal; import java.io.Serializable; import java.util.List; import java.util.Map; +import org.jetbrains.annotations.Nullable; /** * @@ -32,28 +33,51 @@ class ZkJoinEventDataForJoined implements Serializable { private final List<ZookeeperClusterNode> top; /** */ - private final Map<Integer, Serializable> discoData; + private final Map<Long, byte[]> discoData; + + /** */ + private final Map<Long, Long> dupDiscoData; /** * @param top Topology. * @param discoData Discovery data. */ - ZkJoinEventDataForJoined(List<ZookeeperClusterNode> top, Map<Integer, Serializable> discoData) { + ZkJoinEventDataForJoined(List<ZookeeperClusterNode> top, Map<Long, byte[]> discoData, @Nullable Map<Long, Long> dupDiscoData) { + assert top != null; + assert discoData != null && !discoData.isEmpty(); + this.top = top; this.discoData = discoData; + this.dupDiscoData = dupDiscoData; + } + + byte[] discoveryDataForNode(long nodeOrder) { + assert discoData != null; + + byte[] dataBytes = discoData.get(nodeOrder); + + if (dataBytes != null) + return dataBytes; + + assert dupDiscoData != null; + + Long dupDataNode = dupDiscoData.get(nodeOrder); + + assert dupDataNode != null; + + dataBytes = discoData.get(dupDataNode); + + assert dataBytes != null; + + return dataBytes; } /** * @return Current topology. */ List<ZookeeperClusterNode> topology() { - return top; - } + assert top != null; - /** - * @return Discovery data. - */ - Map<Integer, Serializable> discoveryData() { - return discoData; + return top; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2fc690e6/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java new file mode 100644 index 0000000..8149afc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; +import java.util.UUID; + +/** + * + */ +public class ZkJoinedNodeEvtData implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + final long topVer; + + /** */ + final long joinedInternalId; + + /** */ + final UUID nodeId; + + /** */ + final int joinDataPartCnt; + + /** */ + final int secSubjPartCnt; + + /** */ + final UUID joinDataPrefixId; + + /** */ + transient ZkJoiningNodeData joiningNodeData; + + /** + * @param topVer Topology version for node join event. + * @param nodeId Joined node ID. + * @param joinedInternalId Joined node internal ID. + * @param joinDataPrefixId Join data unique prefix. + * @param joinDataPartCnt Join data part count. + * @param secSubjPartCnt Security subject part count. + */ + ZkJoinedNodeEvtData( + long topVer, + UUID nodeId, + long joinedInternalId, + UUID joinDataPrefixId, + int joinDataPartCnt, + int secSubjPartCnt) + { + this.topVer = topVer; + this.nodeId = nodeId; + this.joinedInternalId = joinedInternalId; + this.joinDataPrefixId = joinDataPrefixId; + this.joinDataPartCnt = joinDataPartCnt; + this.secSubjPartCnt = secSubjPartCnt; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "ZkJoinedNodeData [id=" + nodeId + ", order=" + topVer + ']'; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2fc690e6/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 75363e3..20dba12 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.zk.internal; import java.io.ByteArrayInputStream; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.BitSet; import java.util.Collection; import java.util.Collections; @@ -66,6 +67,7 @@ import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; @@ -850,19 +852,11 @@ public class ZookeeperDiscoveryImpl { * @param zkClient Client. * @param basePath Base path. * @param partCnt Parts count. - * @param checkExists If {@code true} checks path exists before calling delete (this check added to avoid errors - * in ZooKeeper log). - * @throws Exception If failed. */ - private void deleteMultiplePartsAsync(ZookeeperClient zkClient, String basePath, int partCnt, boolean checkExists) - throws Exception - { + private void deleteMultiplePartsAsync(ZookeeperClient zkClient, String basePath, int partCnt) { for (int i = 0; i < partCnt; i++) { String path = multipartPathName(basePath, i); - if (checkExists && !zkClient.exists(path)) - continue; - zkClient.deleteIfExistsAsync(path); } } @@ -1619,47 +1613,143 @@ public class ZookeeperDiscoveryImpl { return; } - for (Map.Entry<Long, String> e : alives.entrySet()) { - Long internalId = e.getKey(); + generateJoinEvents(curTop, alives, MAX_NEW_EVTS); - if (!rtState.top.nodesByInternalId.containsKey(internalId)) { - UUID rslvFutId = rtState.evtsData.communicationErrorResolveFutureId(); + if (failedNodes != null) + handleProcessedEventsOnNodesFail(failedNodes); + } - if (rslvFutId != null) { - if (log.isInfoEnabled()) { - log.info("Delay alive nodes change process while communication error resolve " + - "is in progress [reqId=" + rslvFutId + ']'); - } + private void generateJoinEvents(TreeMap<Long, ZookeeperClusterNode> curTop, + TreeMap<Long, String> alives, + final int MAX_NEW_EVTS) throws Exception + { + ZkBulkJoinContext joinCtx = new ZkBulkJoinContext(); - break; - } + for (Map.Entry<Long, String> e : alives.entrySet()) { + Long internalId = e.getKey(); - if (processJoinOnCoordinator(curTop, internalId, e.getValue())) { - newEvts++; + if (!rtState.top.nodesByInternalId.containsKey(internalId)) { + UUID rslvFutId = rtState.evtsData.communicationErrorResolveFutureId(); - if (newEvts == MAX_NEW_EVTS) { - saveAndProcessNewEvents(); + if (rslvFutId != null) { + if (log.isInfoEnabled()) { + log.info("Delay alive nodes change process while communication error resolve " + + "is in progress [reqId=" + rslvFutId + ']'); + } - if (log.isInfoEnabled()) { - log.info("Delay alive nodes change process, max event threshold reached [newEvts=" + newEvts + - ", totalEvts=" + rtState.evtsData.evts.size() + ']'); - } + break; + } - throttleNewEventsGeneration(); + processJoinOnCoordinator(joinCtx, curTop, internalId, e.getValue()); - rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, rtState.watcher, rtState.watcher); + if (joinCtx.nodes() == MAX_NEW_EVTS) { + generateBulkJoinEvent(curTop, joinCtx); - return; - } + if (log.isInfoEnabled()) { + log.info("Delay alive nodes change process, max event threshold reached [" + + "newEvts=" + joinCtx.nodes() + + ", totalEvts=" + rtState.evtsData.evts.size() + ']'); + } + + throttleNewEventsGeneration(); + + rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, rtState.watcher, rtState.watcher); + + return; + } + } + } + + if (joinCtx.nodes() > 0) + generateBulkJoinEvent(curTop, joinCtx); + } + + private void generateBulkJoinEvent(TreeMap<Long, ZookeeperClusterNode> curTop, ZkBulkJoinContext joinCtx) + throws Exception + { + rtState.evtsData.evtIdGen++; + + long evtId = rtState.evtsData.evtIdGen; + + List<T2<ZkJoinedNodeEvtData, Map<Integer, Serializable>>> nodes = joinCtx.nodes; + + assert nodes != null && nodes.size() > 0; + + int nodeCnt = nodes.size(); + + List<ZkJoinedNodeEvtData> joinedNodes = new ArrayList<>(nodeCnt); + + Map<Long, byte[]> discoDataMap = U.newHashMap(nodeCnt); + Map<Long, Long> dupDiscoData = null; + + for (int i = 0; i < nodeCnt; i++) { + T2<ZkJoinedNodeEvtData, Map<Integer, Serializable>> nodeEvtData = nodes.get(i); + + Map<Integer, Serializable> discoData = nodeEvtData.get2(); + + byte[] discoDataBytes = U.marshal(marsh, discoData); + + Long dupDataNode = null; + + for (Map.Entry<Long, byte[]> e : discoDataMap.entrySet()) { + if (Arrays.equals(discoDataBytes, e.getValue())) { + dupDataNode = e.getKey(); + + break; } } + + long nodeTopVer = nodeEvtData.get1().topVer; + + if (dupDataNode != null) { + if (dupDiscoData == null) + dupDiscoData = new HashMap<>(); + + Long old = dupDiscoData.put(nodeTopVer, dupDataNode); + + assert old == null : old; + } + else + discoDataMap.put(nodeTopVer, discoDataBytes); + + joinedNodes.add(nodeEvtData.get1()); } - if (newEvts > 0) - saveAndProcessNewEvents(); + int overhead = 5; - if (failedNodes != null) - handleProcessedEventsOnNodesFail(failedNodes); + ZkJoinEventDataForJoined dataForJoined = new ZkJoinEventDataForJoined( + new ArrayList<>(curTop.values()), + discoDataMap, + dupDiscoData); + + byte[] dataForJoinedBytes = marshalZip(dataForJoined); + + long addDataStart = System.currentTimeMillis(); + + int dataForJoinedPartCnt = saveData(zkPaths.joinEventDataPathForJoined(evtId), + dataForJoinedBytes, + overhead); + + long addDataTime = System.currentTimeMillis() - addDataStart; + + ZkDiscoveryNodeJoinEventData evtData = new ZkDiscoveryNodeJoinEventData( + evtId, + rtState.evtsData.topVer, + joinedNodes, + dataForJoinedPartCnt); + + rtState.evtsData.addEvent(curTop.values(), evtData); + + if (log.isInfoEnabled()) { + log.info("Generated NODE_JOINED event [" + + "nodeCnt=" + nodeCnt + + ", dataForJoinedSize=" + dataForJoinedBytes.length + + ", dataForJoinedPartCnt=" + dataForJoinedPartCnt + + ", addDataTime=" + addDataTime + + ", evt=" + evtData + ']'); + } + + saveAndProcessNewEvents(); } /** @@ -1766,6 +1856,7 @@ public class ZookeeperDiscoveryImpl { * @return {@code True} if new join event was added. */ private boolean processJoinOnCoordinator( + ZkBulkJoinContext joinCtx, TreeMap<Long, ZookeeperClusterNode> curTop, long internalId, String aliveNodePath) @@ -1786,7 +1877,9 @@ public class ZookeeperDiscoveryImpl { assert nodeId.equals(joinedNode.id()) : joiningNodeData.node(); - generateNodeJoin(curTop, + addJoinedNode( + joinCtx, + curTop, joiningNodeData, internalId, prefixId, @@ -1985,7 +2078,8 @@ public class ZookeeperDiscoveryImpl { * @param secSubjZipBytes Marshalled security subject. * @throws Exception If failed. */ - private void generateNodeJoin( + private void addJoinedNode( + ZkBulkJoinContext joinCtx, TreeMap<Long, ZookeeperClusterNode> curTop, ZkJoiningNodeData joiningNodeData, long internalId, @@ -1998,13 +2092,10 @@ public class ZookeeperDiscoveryImpl { UUID nodeId = joinedNode.id(); rtState.evtsData.topVer++; - rtState.evtsData.evtIdGen++; joinedNode.order(rtState.evtsData.topVer); joinedNode.internalId(internalId); - long evtId = rtState.evtsData.evtIdGen; - DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(nodeId); joiningNodeBag.joiningNodeData(joiningNodeData.discoveryData()); @@ -2017,60 +2108,37 @@ public class ZookeeperDiscoveryImpl { Map<Integer, Serializable> commonData = collectBag.commonData(); - ZkJoinEventDataForJoined dataForJoined = new ZkJoinEventDataForJoined( - new ArrayList<>(curTop.values()), - commonData); - Object old = curTop.put(joinedNode.order(), joinedNode); assert old == null; - long addDataStart = System.currentTimeMillis(); - - byte[] dataForJoinedBytes = marshalZip(dataForJoined); - int overhead = 5; - int dataForJoinedPartCnt = saveData(zkPaths.joinEventDataPathForJoined(evtId), - dataForJoinedBytes, - overhead); - int secSubjPartCnt = 0; if (secSubjZipBytes != null) { - secSubjPartCnt = saveData(zkPaths.joinEventSecuritySubjectPath(evtId), secSubjZipBytes, overhead); + secSubjPartCnt = saveData(zkPaths.joinEventSecuritySubjectPath(joinedNode.order()), + secSubjZipBytes, + overhead); assert secSubjPartCnt > 0 : secSubjPartCnt; setNodeSecuritySubject(joinedNode, secSubjZipBytes); } - long addDataTime = System.currentTimeMillis() - addDataStart; - - ZkDiscoveryNodeJoinEventData evtData = new ZkDiscoveryNodeJoinEventData( - evtId, + ZkJoinedNodeEvtData nodeEvtData = new ZkJoinedNodeEvtData( rtState.evtsData.topVer, joinedNode.id(), joinedNode.internalId(), prefixId, joiningNodeData.partCount(), - dataForJoinedPartCnt, secSubjPartCnt); - rtState.evtsData.onNodeJoin(joinedNode); - - evtData.joiningNodeData = joiningNodeData; - - rtState.evtsData.addEvent(dataForJoined.topology(), evtData); + nodeEvtData.joiningNodeData = joiningNodeData; - evtData.addRemainingAck(joinedNode); // Topology for joined node does not contain joined node. + joinCtx.addJoinedNode(nodeEvtData, commonData); - if (log.isInfoEnabled()) { - log.info("Generated NODE_JOINED event [evt=" + evtData + - ", dataForJoinedSize=" + dataForJoinedBytes.length + - ", dataForJoinedPartCnt=" + dataForJoinedPartCnt + - ", addDataTime=" + addDataTime + ']'); - } + rtState.evtsData.onNodeJoin(joinedNode); } /** @@ -2509,116 +2577,82 @@ public class ZookeeperDiscoveryImpl { try { for (ZkDiscoveryEventData evtData : evts.tailMap(rtState.locNodeInfo.lastProcEvt, false).values()) { - if (!rtState.joined) { - if (evtData.eventType() != ZkDiscoveryEventData.ZK_EVT_NODE_JOIN) - continue; - - ZkDiscoveryNodeJoinEventData evtData0 = (ZkDiscoveryNodeJoinEventData)evtData; - - UUID joinedId = evtData0.nodeId; - - boolean locJoin = evtData0.joinedInternalId == rtState.internalOrder; - - if (locJoin) { - assert locNode.id().equals(joinedId); + if (log.isDebugEnabled()) + log.debug("New discovery event data [evt=" + evtData + ", evtsHist=" + evts.size() + ']'); - processLocalJoin(evtsData, evtData0); + switch (evtData.eventType()) { + case ZkDiscoveryEventData.ZK_EVT_NODE_JOIN: { + evtProcessed = processBulkJoin(evtsData, (ZkDiscoveryNodeJoinEventData)evtData); - evtProcessed = true; + break; } - } - else { - if (log.isDebugEnabled()) - log.debug("New discovery event data [evt=" + evtData + ", evtsHist=" + evts.size() + ']'); - - switch (evtData.eventType()) { - case ZkDiscoveryEventData.ZK_EVT_NODE_JOIN: { - ZkDiscoveryNodeJoinEventData evtData0 = (ZkDiscoveryNodeJoinEventData)evtData; - - ZkJoiningNodeData joiningData; - - if (rtState.crd) { - assert evtData0.joiningNodeData != null; - joiningData = evtData0.joiningNodeData; - } - else { - joiningData = unmarshalJoinData(evtData0.nodeId, evtData0.joinDataPrefixId); - - DiscoveryDataBag dataBag = new DiscoveryDataBag(evtData0.nodeId); - - dataBag.joiningNodeData(joiningData.discoveryData()); + case ZkDiscoveryEventData.ZK_EVT_NODE_FAILED: { + if (!rtState.joined) + break; - exchange.onExchange(dataBag); - } + evtProcessed = true; - if (evtData0.secSubjPartCnt > 0 && joiningData.node().attribute(ATTR_SECURITY_SUBJECT_V2) == null) - readAndInitSecuritySubject(joiningData.node(), evtData0); + notifyNodeFail((ZkDiscoveryNodeFailEventData)evtData); - notifyNodeJoin(evtData0, joiningData); + break; + } + case ZkDiscoveryEventData.ZK_EVT_CUSTOM_EVT: { + if (!rtState.joined) break; - } - - case ZkDiscoveryEventData.ZK_EVT_NODE_FAILED: { - notifyNodeFail((ZkDiscoveryNodeFailEventData)evtData); - break; - } + evtProcessed = true; - case ZkDiscoveryEventData.ZK_EVT_CUSTOM_EVT: { - ZkDiscoveryCustomEventData evtData0 = (ZkDiscoveryCustomEventData)evtData; + ZkDiscoveryCustomEventData evtData0 = (ZkDiscoveryCustomEventData)evtData; - if (evtData0.ackEvent() && evtData0.topologyVersion() < locNode.order()) - break; + if (evtData0.ackEvent() && evtData0.topologyVersion() < locNode.order()) + break; - DiscoverySpiCustomMessage msg; + DiscoverySpiCustomMessage msg; - if (rtState.crd) { - assert evtData0.resolvedMsg != null : evtData0; + if (rtState.crd) { + assert evtData0.resolvedMsg != null : evtData0; - msg = evtData0.resolvedMsg; - } - else { - if (evtData0.msg == null) { - if (evtData0.ackEvent()) { - String path = zkPaths.ackEventDataPath(evtData0.origEvtId); + msg = evtData0.resolvedMsg; + } + else { + if (evtData0.msg == null) { + if (evtData0.ackEvent()) { + String path = zkPaths.ackEventDataPath(evtData0.origEvtId); - msg = unmarshalZip(zkClient.getData(path)); - } - else { - assert evtData0.evtPath != null : evtData0; + msg = unmarshalZip(zkClient.getData(path)); + } + else { + assert evtData0.evtPath != null : evtData0; - byte[] msgBytes = readCustomEventData(zkClient, - evtData0.evtPath, - evtData0.sndNodeId); + byte[] msgBytes = readCustomEventData(zkClient, + evtData0.evtPath, + evtData0.sndNodeId); - msg = unmarshalZip(msgBytes); - } + msg = unmarshalZip(msgBytes); } - else - msg = evtData0.msg; - - evtData0.resolvedMsg = msg; } + else + msg = evtData0.msg; - if (msg instanceof ZkInternalMessage) - processInternalMessage(evtData0, (ZkInternalMessage)msg); - else { - notifyCustomEvent(evtData0, msg); + evtData0.resolvedMsg = msg; + } - if (!evtData0.ackEvent()) - updateNodeInfo = true; - } + if (msg instanceof ZkInternalMessage) + processInternalMessage(evtData0, (ZkInternalMessage)msg); + else { + notifyCustomEvent(evtData0, msg); - break; + if (!evtData0.ackEvent()) + updateNodeInfo = true; } - default: - assert false : "Invalid event: " + evtData; + break; } - evtProcessed = true; + default: + assert false : "Invalid event: " + evtData; } if (rtState.joined) { @@ -2666,6 +2700,55 @@ public class ZookeeperDiscoveryImpl { commErrFut.onTopologyChange(rtState.top); // This can add new event, notify out of event process loop. } + private boolean processBulkJoin(ZkDiscoveryEventsData evtsData, ZkDiscoveryNodeJoinEventData evtData) + throws Exception + { + boolean evtProcessed = false; + + for (int i = 0; i < evtData.joinedNodes.size(); i++) { + ZkJoinedNodeEvtData joinedEvtData = evtData.joinedNodes.get(i); + + if (!rtState.joined) { + UUID joinedId = joinedEvtData.nodeId; + + boolean locJoin = joinedEvtData.joinedInternalId == rtState.internalOrder; + + if (locJoin) { + assert locNode.id().equals(joinedId); + + processLocalJoin(evtsData, joinedEvtData, evtData); + + evtProcessed = true; + } + } + else { + ZkJoiningNodeData joiningData; + + if (rtState.crd) { + assert joinedEvtData.joiningNodeData != null; + + joiningData = joinedEvtData.joiningNodeData; + } + else { + joiningData = unmarshalJoinData(joinedEvtData.nodeId, joinedEvtData.joinDataPrefixId); + + DiscoveryDataBag dataBag = new DiscoveryDataBag(joinedEvtData.nodeId); + + dataBag.joiningNodeData(joiningData.discoveryData()); + + exchange.onExchange(dataBag); + } + + if (joinedEvtData.secSubjPartCnt > 0 && joiningData.node().attribute(ATTR_SECURITY_SUBJECT_V2) == null) + readAndInitSecuritySubject(joiningData.node(), joinedEvtData); + + notifyNodeJoin(joinedEvtData, joiningData); + } + } + + return evtProcessed; + } + /** * @param rtState Runtime state. * @param updateNodeInfo {@code True} if need update processed events without delay. @@ -2745,14 +2828,14 @@ public class ZookeeperDiscoveryImpl { /** * @param node Node. - * @param evtData Node join event data. + * @param joinedEvtData Joined node information. * @throws Exception If failed. */ - private void readAndInitSecuritySubject(ZookeeperClusterNode node, ZkDiscoveryNodeJoinEventData evtData) throws Exception { - if (evtData.secSubjPartCnt > 0) { + private void readAndInitSecuritySubject(ZookeeperClusterNode node, ZkJoinedNodeEvtData joinedEvtData) throws Exception { + if (joinedEvtData.secSubjPartCnt > 0) { byte[] zipBytes = readMultipleParts(rtState.zkClient, - zkPaths.joinEventSecuritySubjectPath(evtData.eventId()), - evtData.secSubjPartCnt); + zkPaths.joinEventSecuritySubjectPath(joinedEvtData.topVer), + joinedEvtData.secSubjPartCnt); setNodeSecuritySubject(node, zipBytes); } @@ -2764,7 +2847,9 @@ public class ZookeeperDiscoveryImpl { * @throws Exception If failed. */ @SuppressWarnings("unchecked") - private void processLocalJoin(ZkDiscoveryEventsData evtsData, final ZkDiscoveryNodeJoinEventData evtData) + private void processLocalJoin(ZkDiscoveryEventsData evtsData, + ZkJoinedNodeEvtData joinedEvtData, + ZkDiscoveryNodeJoinEventData evtData) throws Exception { synchronized (stateMux) { @@ -2781,7 +2866,7 @@ public class ZookeeperDiscoveryImpl { spi.getSpiContext().removeTimeoutObject(rtState.joinErrTo); if (log.isInfoEnabled()) - log.info("Local join event data: " + evtData + ']'); + log.info("Local join event data: " + joinedEvtData + ']'); String path = zkPaths.joinEventDataPathForJoined(evtData.eventId()); @@ -2791,14 +2876,19 @@ public class ZookeeperDiscoveryImpl { rtState.gridStartTime = evtsData.clusterStartTime; - locNode.internalId(evtData.joinedInternalId); - locNode.order(evtData.topologyVersion()); + locNode.internalId(joinedEvtData.joinedInternalId); + locNode.order(joinedEvtData.topVer); - readAndInitSecuritySubject(locNode, evtData); + readAndInitSecuritySubject(locNode, joinedEvtData); + + byte[] discoDataBytes = dataForJoined.discoveryDataForNode(locNode.order()); + + Map<Integer, Serializable> commonDiscoData = + marsh.unmarshal(discoDataBytes, U.resolveClassLoader(spi.ignite().configuration())); DiscoveryDataBag dataBag = new DiscoveryDataBag(locNode.id()); - dataBag.commonData(dataForJoined.discoveryData()); + dataBag.commonData(commonDiscoData); exchange.onExchange(dataBag); @@ -2807,6 +2897,10 @@ public class ZookeeperDiscoveryImpl { for (int i = 0; i < allNodes.size(); i++) { ZookeeperClusterNode node = allNodes.get(i); + // Need filter since ZkJoinEventDataForJoined contains single topology snapshot for all joined nodes. + if (node.order() >= locNode.order()) + break; + node.setMetrics(new ClusterMetricsSnapshot()); rtState.top.addNode(node); @@ -2817,7 +2911,7 @@ public class ZookeeperDiscoveryImpl { final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); lsnr.onDiscovery(EVT_NODE_JOINED, - evtData.topologyVersion(), + joinedEvtData.topVer, locNode, topSnapshot, Collections.<Long, Collection<ClusterNode>>emptyMap(), @@ -2825,7 +2919,7 @@ public class ZookeeperDiscoveryImpl { if (rtState.prevJoined) { lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED, - evtData.topologyVersion(), + joinedEvtData.topVer, locNode, topSnapshot, Collections.<Long, Collection<ClusterNode>>emptyMap(), @@ -2839,8 +2933,6 @@ public class ZookeeperDiscoveryImpl { joinFut.onDone(); - deleteDataForJoinedAsync(evtData); - if (locNode.isClient()) rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckClientsStatusCallback(rtState)); } @@ -3302,15 +3394,15 @@ public class ZookeeperDiscoveryImpl { } /** - * @param evtData Event data. + * @param joinedEvtData Event data. * @param joiningData Joining node data. */ @SuppressWarnings("unchecked") - private void notifyNodeJoin(final ZkDiscoveryNodeJoinEventData evtData, ZkJoiningNodeData joiningData) { + private void notifyNodeJoin(ZkJoinedNodeEvtData joinedEvtData, ZkJoiningNodeData joiningData) { final ZookeeperClusterNode joinedNode = joiningData.node(); - joinedNode.order(evtData.topologyVersion()); - joinedNode.internalId(evtData.joinedInternalId); + joinedNode.order(joinedEvtData.topVer); + joinedNode.internalId(joinedEvtData.joinedInternalId); joinedNode.setMetrics(new ClusterMetricsSnapshot()); @@ -3319,7 +3411,7 @@ public class ZookeeperDiscoveryImpl { final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); lsnr.onDiscovery(EVT_NODE_JOINED, - evtData.topologyVersion(), + joinedEvtData.topVer, joinedNode, topSnapshot, Collections.<Long, Collection<ClusterNode>>emptyMap(), @@ -3571,25 +3663,27 @@ public class ZookeeperDiscoveryImpl { if (log.isDebugEnabled()) log.debug("All nodes processed node join [evtData=" + evtData + ']'); - deleteJoiningNodeData(evtData.nodeId, evtData.joinDataPrefixId, evtData.joinDataPartCnt); + for (int i = 0; i < evtData.joinedNodes.size(); i++) { + ZkJoinedNodeEvtData joinedEvtData = evtData.joinedNodes.get(i); - deleteDataForJoinedAsync(evtData); + deleteJoiningNodeData(joinedEvtData.nodeId, joinedEvtData.joinDataPrefixId, joinedEvtData.joinDataPartCnt); - if (evtData.secSubjPartCnt > 0) { - deleteMultiplePartsAsync(rtState.zkClient, - zkPaths.joinEventSecuritySubjectPath(evtData.eventId()), - evtData.secSubjPartCnt, - false); + if (joinedEvtData.secSubjPartCnt > 0) { + deleteMultiplePartsAsync(rtState.zkClient, + zkPaths.joinEventSecuritySubjectPath(evtData.eventId()), + joinedEvtData.secSubjPartCnt); + } } + + deleteDataForJoinedAsync(evtData); } /** * @param nodeId Node ID. * @param joinDataPrefixId Path prefix. * @param partCnt Parts count. - * @throws Exception If failed. */ - private void deleteJoiningNodeData(UUID nodeId, UUID joinDataPrefixId, int partCnt) throws Exception { + private void deleteJoiningNodeData(UUID nodeId, UUID joinDataPrefixId, int partCnt) { String evtDataPath = zkPaths.joiningNodeDataPath(nodeId, joinDataPrefixId); if (log.isDebugEnabled()) @@ -3598,20 +3692,19 @@ public class ZookeeperDiscoveryImpl { rtState.zkClient.deleteIfExistsAsync(evtDataPath); if (partCnt > 1) - deleteMultiplePartsAsync(rtState.zkClient, evtDataPath + ":", partCnt, true); + deleteMultiplePartsAsync(rtState.zkClient, evtDataPath + ":", partCnt); } /** * @param evtData Event data. - * @throws Exception If failed. */ - private void deleteDataForJoinedAsync(ZkDiscoveryNodeJoinEventData evtData) throws Exception { + private void deleteDataForJoinedAsync(ZkDiscoveryNodeJoinEventData evtData) { String dataForJoinedPath = zkPaths.joinEventDataPathForJoined(evtData.eventId()); if (log.isDebugEnabled()) log.debug("Delete data for joined node [path=" + dataForJoinedPath + ']'); - deleteMultiplePartsAsync(rtState.zkClient, dataForJoinedPath, evtData.dataForJoinedPartCnt, true); + deleteMultiplePartsAsync(rtState.zkClient, dataForJoinedPath, evtData.dataForJoinedPartCnt); } /** @@ -3800,7 +3893,7 @@ public class ZookeeperDiscoveryImpl { byte[] marshalZip(Object obj) throws IgniteCheckedException { assert obj != null; - return zip(marsh.marshal(obj)); + return zip(U.marshal(marsh, obj)); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/2fc690e6/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java index 75ecb8c..273200a 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java @@ -380,8 +380,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { throw new IgniteException("Failed to create directory for test Zookeeper server: " + file.getAbsolutePath()); } - - specs.add(new InstanceSpec(file, -1, -1, -1, true, -1, 1000, -1)); + specs.add(new InstanceSpec(file, -1, -1, -1, true, -1, 1000, 10_000)); } return new TestingCluster(specs); @@ -1471,6 +1470,41 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testConcurrentStart() throws Exception { + final int NODES = 20; + + for (int i = 0; i < 3; i++) { + info("Iteration: " + i); + + final AtomicInteger idx = new AtomicInteger(); + + final CyclicBarrier b = new CyclicBarrier(NODES); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + b.await(); + + int threadIdx = idx.getAndIncrement(); + + startGrid(threadIdx); + + return null; + } + }, NODES, "start-node"); + + waitForTopology(NODES); + + stopAllGrids(); + + checkEventsConsistency(); + + evts.clear(); + } + } + + /** + * @throws Exception If failed. + */ public void testConcurrentStartStop1() throws Exception { concurrentStartStop(1); }