Repository: ignite Updated Branches: refs/heads/ignite-zk b8979efad -> 391ec5b55
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/391ec5b5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/391ec5b5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/391ec5b5 Branch: refs/heads/ignite-zk Commit: 391ec5b551bd4f9e2915b93ccc4d71ebb1b29242 Parents: b8979ef Author: sboikov <[email protected]> Authored: Wed Dec 20 16:13:15 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Dec 20 17:00:14 2017 +0300 ---------------------------------------------------------------------- .../internal/ZkDiscoveryNodeJoinEventData.java | 3 + .../ZkDistributedCollectDataFuture.java | 2 + .../discovery/zk/internal/ZkIgnitePaths.java | 8 ++ .../zk/internal/ZkJoiningNodeData.java | 6 + .../zk/internal/ZkNodeValidateResult.java | 45 +++++++ .../zk/internal/ZookeeperDiscoveryImpl.java | 133 +++++++++++-------- 6 files changed, 141 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/391ec5b5/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java index 5081a4d..5967e1c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java @@ -50,6 +50,9 @@ class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData { * @param topVer Topology version. * @param nodeId Joined node ID. * @param joinedInternalId Joined node internal ID. + * @param joinDataPrefixId Join data unique prefix. + * @param joinDataPartCnt Join data part count. + * @param dataForJoinedPartCnt Data for joined part count. */ ZkDiscoveryNodeJoinEventData(long evtId, long topVer, http://git-wip-us.apache.org/repos/asf/ignite/blob/391ec5b5/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java index 2467928..5645791 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java @@ -68,6 +68,8 @@ class ZkDistributedCollectDataFuture extends GridFutureAdapter<Void> { ZkClusterNodes top = rtState.top; + // Assume new nodes can not join while future is in progress. + remainingNodes = U.newHashSet(top.nodesByOrder.size()); for (ZookeeperClusterNode node : top.nodesByInternalId.values()) http://git-wip-us.apache.org/repos/asf/ignite/blob/391ec5b5/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java index 588a5ca..818df75 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java @@ -290,6 +290,14 @@ class ZkIgnitePaths { /** * @param evtId Event ID. + * @return Event zk path. + */ + String joinEventSecuritySubject(long evtId) { + return evtsPath + "/s-" + evtId; + } + + /** + * @param evtId Event ID. * @return Path for custom event ack. */ String ackEventDataPath(long evtId) { http://git-wip-us.apache.org/repos/asf/ignite/blob/391ec5b5/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java index 284cbff..ff8311d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java @@ -40,6 +40,9 @@ class ZkJoiningNodeData implements Serializable { @GridToStringInclude private Map<Integer, Serializable> discoData; + /** + * @param partCnt Number of parts in multi-parts message. + */ ZkJoiningNodeData(int partCnt) { this.partCnt = partCnt; } @@ -56,6 +59,9 @@ class ZkJoiningNodeData implements Serializable { this.discoData = discoData; } + /** + * @return Number of parts in multi-parts message. + */ int partCount() { return partCnt; } http://git-wip-us.apache.org/repos/asf/ignite/blob/391ec5b5/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java new file mode 100644 index 0000000..52383d7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; + +/** + * + */ +class ZkNodeValidateResult { + /** */ + String err; + + /** */ + Serializable secSubj; + + /** + * @param err Error. + */ + ZkNodeValidateResult(String err) { + this.err = err; + } + + /** + * @param secSubj Node security subject. + */ + ZkNodeValidateResult(Serializable secSubj) { + this.secSubj = secSubj; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/391ec5b5/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 7ea544f..f79e3f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -46,6 +46,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CommunicationProblemResolver; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.ClusterMetricsSnapshot; +import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteNodeAttributes; @@ -1182,7 +1183,7 @@ public class ZookeeperDiscoveryImpl { if (evtData instanceof ZkDiscoveryCustomEventData) { ZkDiscoveryCustomEventData evtData0 = (ZkDiscoveryCustomEventData)evtData; - // It is possible previous coordinator failed before finished message processing. + // It is possible previous coordinator failed before finished cleanup. if (evtData0.msg instanceof ZkCommunicationErrorResolveFinishMessage) { try { ZkCommunicationErrorResolveFinishMessage msg = @@ -1207,8 +1208,6 @@ public class ZookeeperDiscoveryImpl { * @throws Exception If failed. */ private void onBecomeCoordinator(List<String> aliveNodes, int locInternalId) throws Exception { - long topVer0 = rtState.evtsData != null ? rtState.evtsData.topVer : -1L; - byte[] evtsDataBytes = rtState.zkClient.getData(zkPaths.evtsPath); if (evtsDataBytes.length > 0) @@ -1225,9 +1224,6 @@ public class ZookeeperDiscoveryImpl { previousCoordinatorCleanup(rtState.evtsData); - if (topVer0 > rtState.evtsData.topVer) - rtState.evtsData.topVer = topVer0; - UUID futId = rtState.evtsData.communicationErrorResolveFutureId(); if (futId != null) { @@ -1508,90 +1504,108 @@ public class ZookeeperDiscoveryImpl { Object data = unmarshalJoinDataOnCoordinator(nodeId, prefixId, aliveNodePath); - ZkInternalJoinErrorMessage joinErr = null; - ZkJoiningNodeData joiningNodeData = null; - if (data instanceof ZkJoiningNodeData) { - joiningNodeData = (ZkJoiningNodeData)data; + ZkJoiningNodeData joiningNodeData = (ZkJoiningNodeData)data; - String err = validateJoiningNode(joiningNodeData.node()); + ZkNodeValidateResult validateRes = validateJoiningNode(joiningNodeData.node()); - if (err != null) - joinErr = new ZkInternalJoinErrorMessage(ZkIgnitePaths.aliveInternalId(aliveNodePath), err); - } - else { - assert data instanceof ZkInternalJoinErrorMessage : data; + if (validateRes.err == null) { + ZookeeperClusterNode joinedNode = joiningNodeData.node(); - joinErr = (ZkInternalJoinErrorMessage)data; - } + assert nodeId.equals(joinedNode.id()) : joiningNodeData.node(); - if (joinErr == null) { - ZookeeperClusterNode joinedNode = joiningNodeData.node(); + generateNodeJoin(curTop, joiningNodeData, internalId, prefixId); - assert nodeId.equals(joinedNode.id()) : joiningNodeData.node(); + watchAliveNodeData(aliveNodePath); - generateNodeJoin(curTop, joiningNodeData, internalId, prefixId); + return true; + } + else { + ZkInternalJoinErrorMessage joinErr = new ZkInternalJoinErrorMessage( + ZkIgnitePaths.aliveInternalId(aliveNodePath), + validateRes.err); - watchAliveNodeData(aliveNodePath); + processJoinError(aliveNodePath, nodeId, prefixId, joinErr); - return true; + return false; + } } else { - ZookeeperClient client = rtState.zkClient; + assert data instanceof ZkInternalJoinErrorMessage : data; - if (joinErr.notifyNode) { - String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId); + processJoinError(aliveNodePath, nodeId, prefixId, (ZkInternalJoinErrorMessage)data); - client.setData(joinDataPath, marshalZip(joinErr), -1); + return false; + } + } - client.deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1); - } - else { - if (log.isInfoEnabled()) - log.info("Ignore join data, node was failed by previous coordinator: " + aliveNodePath); + /** + * @param aliveNodePath Joined node path. + * @param nodeId Node ID. + * @param prefixId Path prefix ID. + * @param joinErr Join error message. + * @throws Exception If failed. + */ + private void processJoinError(String aliveNodePath, + UUID nodeId, + UUID prefixId, + ZkInternalJoinErrorMessage joinErr) throws Exception { + ZookeeperClient client = rtState.zkClient; - client.deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1); - } + if (joinErr.notifyNode) { + String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId); - return false; + client.setData(joinDataPath, marshalZip(joinErr), -1); + + client.deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1); + } + else { + if (log.isInfoEnabled()) + log.info("Ignore join data, node was failed by previous coordinator: " + aliveNodePath); + + client.deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1); } } /** * @param node Joining node. - * @return Non null error message if validation failed. + * @return Validation result. */ - @Nullable private String validateJoiningNode(ZookeeperClusterNode node) { + private ZkNodeValidateResult validateJoiningNode(ZookeeperClusterNode node) { ZookeeperClusterNode node0 = rtState.top.nodesById.get(node.id()); if (node0 != null) { U.error(log, "Failed to include node in cluster, node with the same ID already exists [joiningNode=" + node + ", existingNode=" + node0 + ']'); - return "Node with the same ID already exists: " + node0; + return new ZkNodeValidateResult("Node with the same ID already exists: " + node0); } - String authErr = authenticateNode(node); + ZkNodeValidateResult res = authenticateNode(node); - if (authErr != null) - return null; + if (res.err != null) + return res; IgniteNodeValidationResult err = spi.getSpiContext().validateNode(node); if (err != null) { LT.warn(log, err.message()); - return err.sendMessage(); + res.err = err.sendMessage(); } - return null; + return res; } - @Nullable private String authenticateNode(ZookeeperClusterNode node) { + /** + * @param node Node. + * @return Validation result. + */ + private ZkNodeValidateResult authenticateNode(ZookeeperClusterNode node) { DiscoverySpiNodeAuthenticator nodeAuth = spi.getAuthenticator(); if (nodeAuth == null) - return null; + return new ZkNodeValidateResult(null); SecurityCredentials cred; @@ -1601,7 +1615,7 @@ public class ZookeeperDiscoveryImpl { catch (Exception e) { U.error(log, "Failed to unmarshal node credentials: " + e, e); - return "Failed to unmarshal node credentials"; + return new ZkNodeValidateResult("Failed to unmarshal node credentials"); } SecurityContext subj = nodeAuth.authenticateNode(node, cred); @@ -1612,7 +1626,7 @@ public class ZookeeperDiscoveryImpl { "Authentication failed [nodeId=" + U.id8(node.id()) + ", addrs=" + U.addressesAsString(node) + ']'); - return "Authentication failed"; + return new ZkNodeValidateResult("Authentication failed"); } if (!(subj instanceof Serializable)) { @@ -1622,10 +1636,10 @@ public class ZookeeperDiscoveryImpl { ", addrs=" + U.addressesAsString(node) + ']'); - return "Authentication subject is not serializable"; + return new ZkNodeValidateResult("Authentication subject is not serializable"); } - return null; + return new ZkNodeValidateResult((Serializable)subj); } /** @@ -3105,12 +3119,22 @@ public class ZookeeperDiscoveryImpl { if (zkClient != null) zkClient.close(); + finishFutures(new IgniteCheckedException("Node stopped.")); + + IgniteUtils.shutdownNow(ZookeeperDiscoveryImpl.class, utilityPool, log); + } + + /** + * @param err Error. + */ + private void finishFutures(IgniteCheckedException err) { ZkCommunicationErrorProcessFuture commErrFut = commErrProcFut.get(); if (commErrFut != null) - commErrFut.onError(new IgniteCheckedException("Node stopped.")); + commErrFut.onError(err); - IgniteUtils.shutdownNow(ZookeeperDiscoveryImpl.class, utilityPool, log); + for (PingFuture fut : pingFuts.values()) + fut.onDone(err); } /** @@ -3193,10 +3217,7 @@ public class ZookeeperDiscoveryImpl { /** {@inheritDoc} */ @Override public void run() { - ZkCommunicationErrorProcessFuture commErrFut = commErrProcFut.get(); - - if (commErrFut != null) - commErrFut.onError(new IgniteCheckedException("Client node disconnected.")); + finishFutures(new IgniteClientDisconnectedCheckedException(null, "Client node disconnected.")); rtState.closing = true;
