Repository: ignite
Updated Branches:
  refs/heads/ignite-zk b8979efad -> 391ec5b55


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/391ec5b5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/391ec5b5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/391ec5b5

Branch: refs/heads/ignite-zk
Commit: 391ec5b551bd4f9e2915b93ccc4d71ebb1b29242
Parents: b8979ef
Author: sboikov <[email protected]>
Authored: Wed Dec 20 16:13:15 2017 +0300
Committer: sboikov <[email protected]>
Committed: Wed Dec 20 17:00:14 2017 +0300

----------------------------------------------------------------------
 .../internal/ZkDiscoveryNodeJoinEventData.java  |   3 +
 .../ZkDistributedCollectDataFuture.java         |   2 +
 .../discovery/zk/internal/ZkIgnitePaths.java    |   8 ++
 .../zk/internal/ZkJoiningNodeData.java          |   6 +
 .../zk/internal/ZkNodeValidateResult.java       |  45 +++++++
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 133 +++++++++++--------
 6 files changed, 141 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/391ec5b5/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
index 5081a4d..5967e1c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
@@ -50,6 +50,9 @@ class ZkDiscoveryNodeJoinEventData extends 
ZkDiscoveryEventData {
      * @param topVer Topology version.
      * @param nodeId Joined node ID.
      * @param joinedInternalId Joined node internal ID.
+     * @param joinDataPrefixId Join data unique prefix.
+     * @param joinDataPartCnt Join data part count.
+     * @param dataForJoinedPartCnt Data for joined part count.
      */
     ZkDiscoveryNodeJoinEventData(long evtId,
         long topVer,

http://git-wip-us.apache.org/repos/asf/ignite/blob/391ec5b5/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
index 2467928..5645791 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
@@ -68,6 +68,8 @@ class ZkDistributedCollectDataFuture extends 
GridFutureAdapter<Void> {
 
         ZkClusterNodes top = rtState.top;
 
+        // Assume new nodes can not join while future is in progress.
+
         remainingNodes = U.newHashSet(top.nodesByOrder.size());
 
         for (ZookeeperClusterNode node : top.nodesByInternalId.values())

http://git-wip-us.apache.org/repos/asf/ignite/blob/391ec5b5/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index 588a5ca..818df75 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -290,6 +290,14 @@ class ZkIgnitePaths {
 
     /**
      * @param evtId Event ID.
+     * @return Event zk path.
+     */
+    String joinEventSecuritySubject(long evtId) {
+        return evtsPath + "/s-" + evtId;
+    }
+
+    /**
+     * @param evtId Event ID.
      * @return Path for custom event ack.
      */
     String ackEventDataPath(long evtId) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/391ec5b5/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
index 284cbff..ff8311d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
@@ -40,6 +40,9 @@ class ZkJoiningNodeData implements Serializable {
     @GridToStringInclude
     private Map<Integer, Serializable> discoData;
 
+    /**
+     * @param partCnt Number of parts in multi-parts message.
+     */
     ZkJoiningNodeData(int partCnt) {
         this.partCnt = partCnt;
     }
@@ -56,6 +59,9 @@ class ZkJoiningNodeData implements Serializable {
         this.discoData = discoData;
     }
 
+    /**
+     * @return Number of parts in multi-parts message.
+     */
     int partCount() {
         return partCnt;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/391ec5b5/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java
new file mode 100644
index 0000000..52383d7
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+
+/**
+ *
+ */
+class ZkNodeValidateResult {
+    /** */
+    String err;
+
+    /** */
+    Serializable secSubj;
+
+    /**
+     * @param err Error.
+     */
+    ZkNodeValidateResult(String err) {
+        this.err = err;
+    }
+
+    /**
+     * @param secSubj Node security subject.
+     */
+    ZkNodeValidateResult(Serializable secSubj) {
+        this.secSubj = secSubj;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/391ec5b5/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 7ea544f..f79e3f5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -46,6 +46,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CommunicationProblemResolver;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.ClusterMetricsSnapshot;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteNodeAttributes;
@@ -1182,7 +1183,7 @@ public class ZookeeperDiscoveryImpl {
             if (evtData instanceof ZkDiscoveryCustomEventData) {
                 ZkDiscoveryCustomEventData evtData0 = 
(ZkDiscoveryCustomEventData)evtData;
 
-                // It is possible previous coordinator failed before finished 
message processing.
+                // It is possible previous coordinator failed before finished 
cleanup.
                 if (evtData0.msg instanceof 
ZkCommunicationErrorResolveFinishMessage) {
                     try {
                         ZkCommunicationErrorResolveFinishMessage msg =
@@ -1207,8 +1208,6 @@ public class ZookeeperDiscoveryImpl {
      * @throws Exception If failed.
      */
     private void onBecomeCoordinator(List<String> aliveNodes, int 
locInternalId) throws Exception {
-        long topVer0 = rtState.evtsData != null ? rtState.evtsData.topVer : 
-1L;
-
         byte[] evtsDataBytes = rtState.zkClient.getData(zkPaths.evtsPath);
 
         if (evtsDataBytes.length > 0)
@@ -1225,9 +1224,6 @@ public class ZookeeperDiscoveryImpl {
 
             previousCoordinatorCleanup(rtState.evtsData);
 
-            if (topVer0 > rtState.evtsData.topVer)
-                rtState.evtsData.topVer = topVer0;
-
             UUID futId = rtState.evtsData.communicationErrorResolveFutureId();
 
             if (futId != null) {
@@ -1508,90 +1504,108 @@ public class ZookeeperDiscoveryImpl {
 
         Object data = unmarshalJoinDataOnCoordinator(nodeId, prefixId, 
aliveNodePath);
 
-        ZkInternalJoinErrorMessage joinErr = null;
-        ZkJoiningNodeData joiningNodeData = null;
-
         if (data instanceof ZkJoiningNodeData) {
-            joiningNodeData = (ZkJoiningNodeData)data;
+            ZkJoiningNodeData joiningNodeData = (ZkJoiningNodeData)data;
 
-            String err = validateJoiningNode(joiningNodeData.node());
+            ZkNodeValidateResult validateRes = 
validateJoiningNode(joiningNodeData.node());
 
-            if (err != null)
-                joinErr = new 
ZkInternalJoinErrorMessage(ZkIgnitePaths.aliveInternalId(aliveNodePath), err);
-        }
-        else {
-            assert data instanceof ZkInternalJoinErrorMessage : data;
+            if (validateRes.err == null) {
+                ZookeeperClusterNode joinedNode = joiningNodeData.node();
 
-            joinErr = (ZkInternalJoinErrorMessage)data;
-        }
+                assert nodeId.equals(joinedNode.id()) : joiningNodeData.node();
 
-        if (joinErr == null) {
-            ZookeeperClusterNode joinedNode = joiningNodeData.node();
+                generateNodeJoin(curTop, joiningNodeData, internalId, 
prefixId);
 
-            assert nodeId.equals(joinedNode.id()) : joiningNodeData.node();
+                watchAliveNodeData(aliveNodePath);
 
-            generateNodeJoin(curTop, joiningNodeData, internalId, prefixId);
+                return true;
+            }
+            else {
+                ZkInternalJoinErrorMessage joinErr = new 
ZkInternalJoinErrorMessage(
+                    ZkIgnitePaths.aliveInternalId(aliveNodePath),
+                    validateRes.err);
 
-            watchAliveNodeData(aliveNodePath);
+                processJoinError(aliveNodePath, nodeId, prefixId, joinErr);
 
-            return true;
+                return false;
+            }
         }
         else {
-            ZookeeperClient client = rtState.zkClient;
+            assert data instanceof ZkInternalJoinErrorMessage : data;
 
-            if (joinErr.notifyNode) {
-                String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, 
prefixId);
+            processJoinError(aliveNodePath, nodeId, prefixId, 
(ZkInternalJoinErrorMessage)data);
 
-                client.setData(joinDataPath, marshalZip(joinErr), -1);
+            return false;
+        }
+    }
 
-                client.deleteIfExists(zkPaths.aliveNodesDir + "/" + 
aliveNodePath, -1);
-            }
-            else {
-                if (log.isInfoEnabled())
-                    log.info("Ignore join data, node was failed by previous 
coordinator: " + aliveNodePath);
+    /**
+     * @param aliveNodePath Joined node path.
+     * @param nodeId Node ID.
+     * @param prefixId Path prefix ID.
+     * @param joinErr Join error message.
+     * @throws Exception If failed.
+     */
+    private void processJoinError(String aliveNodePath,
+        UUID nodeId,
+        UUID prefixId,
+        ZkInternalJoinErrorMessage joinErr) throws Exception {
+        ZookeeperClient client = rtState.zkClient;
 
-                client.deleteIfExists(zkPaths.aliveNodesDir + "/" + 
aliveNodePath, -1);
-            }
+        if (joinErr.notifyNode) {
+            String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, 
prefixId);
 
-            return false;
+            client.setData(joinDataPath, marshalZip(joinErr), -1);
+
+            client.deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, 
-1);
+        }
+        else {
+            if (log.isInfoEnabled())
+                log.info("Ignore join data, node was failed by previous 
coordinator: " + aliveNodePath);
+
+            client.deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, 
-1);
         }
     }
 
     /**
      * @param node Joining node.
-     * @return Non null error message if validation failed.
+     * @return Validation result.
      */
-    @Nullable private String validateJoiningNode(ZookeeperClusterNode node) {
+    private ZkNodeValidateResult validateJoiningNode(ZookeeperClusterNode 
node) {
         ZookeeperClusterNode node0 = rtState.top.nodesById.get(node.id());
 
         if (node0 != null) {
             U.error(log, "Failed to include node in cluster, node with the 
same ID already exists [joiningNode=" + node +
                 ", existingNode=" + node0 + ']');
 
-            return "Node with the same ID already exists: " + node0;
+            return new ZkNodeValidateResult("Node with the same ID already 
exists: " + node0);
         }
 
-        String authErr = authenticateNode(node);
+        ZkNodeValidateResult res = authenticateNode(node);
 
-        if (authErr != null)
-            return null;
+        if (res.err != null)
+            return res;
 
         IgniteNodeValidationResult err = 
spi.getSpiContext().validateNode(node);
 
         if (err != null) {
             LT.warn(log, err.message());
 
-            return err.sendMessage();
+            res.err = err.sendMessage();
         }
 
-        return null;
+        return res;
     }
 
-    @Nullable private String authenticateNode(ZookeeperClusterNode node) {
+    /**
+     * @param node Node.
+     * @return Validation result.
+     */
+    private ZkNodeValidateResult authenticateNode(ZookeeperClusterNode node) {
         DiscoverySpiNodeAuthenticator nodeAuth = spi.getAuthenticator();
 
         if (nodeAuth == null)
-            return null;
+            return new ZkNodeValidateResult(null);
 
         SecurityCredentials cred;
 
@@ -1601,7 +1615,7 @@ public class ZookeeperDiscoveryImpl {
         catch (Exception e) {
             U.error(log, "Failed to unmarshal node credentials: " + e, e);
 
-            return "Failed to unmarshal node credentials";
+            return new ZkNodeValidateResult("Failed to unmarshal node 
credentials");
         }
 
         SecurityContext subj = nodeAuth.authenticateNode(node, cred);
@@ -1612,7 +1626,7 @@ public class ZookeeperDiscoveryImpl {
                 "Authentication failed [nodeId=" + U.id8(node.id()) + ", 
addrs=" +
                     U.addressesAsString(node) + ']');
 
-            return "Authentication failed";
+            return new ZkNodeValidateResult("Authentication failed");
         }
 
         if (!(subj instanceof Serializable)) {
@@ -1622,10 +1636,10 @@ public class ZookeeperDiscoveryImpl {
                     ", addrs=" +
                     U.addressesAsString(node) + ']');
 
-            return "Authentication subject is not serializable";
+            return new ZkNodeValidateResult("Authentication subject is not 
serializable");
         }
 
-        return null;
+        return new ZkNodeValidateResult((Serializable)subj);
     }
 
     /**
@@ -3105,12 +3119,22 @@ public class ZookeeperDiscoveryImpl {
         if (zkClient != null)
             zkClient.close();
 
+        finishFutures(new IgniteCheckedException("Node stopped."));
+
+        IgniteUtils.shutdownNow(ZookeeperDiscoveryImpl.class, utilityPool, 
log);
+    }
+
+    /**
+     * @param err Error.
+     */
+    private void finishFutures(IgniteCheckedException err) {
         ZkCommunicationErrorProcessFuture commErrFut = commErrProcFut.get();
 
         if (commErrFut != null)
-            commErrFut.onError(new IgniteCheckedException("Node stopped."));
+            commErrFut.onError(err);
 
-        IgniteUtils.shutdownNow(ZookeeperDiscoveryImpl.class, utilityPool, 
log);
+        for (PingFuture fut : pingFuts.values())
+            fut.onDone(err);
     }
 
     /**
@@ -3193,10 +3217,7 @@ public class ZookeeperDiscoveryImpl {
 
         /** {@inheritDoc} */
         @Override public void run() {
-            ZkCommunicationErrorProcessFuture commErrFut = 
commErrProcFut.get();
-
-            if (commErrFut != null)
-                commErrFut.onError(new IgniteCheckedException("Client node 
disconnected."));
+            finishFutures(new IgniteClientDisconnectedCheckedException(null, 
"Client node disconnected."));
 
             rtState.closing = true;
 

Reply via email to