zk

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0bddcdef
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0bddcdef
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0bddcdef

Branch: refs/heads/ignite-zk
Commit: 0bddcdef32646c407e879fbe39765f877928d44c
Parents: 74526d1
Author: sboikov <[email protected]>
Authored: Mon Dec 4 18:09:43 2017 +0300
Committer: sboikov <[email protected]>
Committed: Mon Dec 4 18:09:43 2017 +0300

----------------------------------------------------------------------
 .../zk/internal/ZkInternalFailNodeMessage.java  |  55 --------
 .../ZkInternalForceNodeFailMessage.java         |  55 ++++++++
 .../zk/internal/ZkInternalJoinErrorMessage.java |  43 +++++++
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 129 +++++++++++++------
 .../ZookeeperDiscoverySpiBasicTest.java         |  17 +++
 5 files changed, 204 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0bddcdef/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalFailNodeMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalFailNodeMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalFailNodeMessage.java
deleted file mode 100644
index a97289d..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalFailNodeMessage.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.util.UUID;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-public class ZkInternalFailNodeMessage implements ZkInternalMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    final UUID nodeId;
-
-    /** */
-    final String warning;
-
-    /**
-     * @param nodeId Node ID.
-     * @param warning Warning to be displayed on all nodes.
-     */
-    ZkInternalFailNodeMessage(UUID nodeId, String warning) {
-        this.nodeId = nodeId;
-        this.warning = warning;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isMutable() {
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0bddcdef/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java
new file mode 100644
index 0000000..fafcafc
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.UUID;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class ZkInternalForceNodeFailMessage implements ZkInternalMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    final UUID nodeId;
+
+    /** */
+    final String warning;
+
+    /**
+     * @param nodeId Node ID.
+     * @param warning Warning to be displayed on all nodes.
+     */
+    ZkInternalForceNodeFailMessage(UUID nodeId, String warning) {
+        this.nodeId = nodeId;
+        this.warning = warning;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0bddcdef/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
new file mode 100644
index 0000000..7e06858
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+
+/**
+ *
+ */
+class ZkInternalJoinErrorMessage implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final int nodeInternalId;
+
+    /** */
+    private final String err;
+
+    /**
+     * @param nodeInternalId Joining node internal ID.
+     * @param err Error message.
+     */
+    ZkInternalJoinErrorMessage(int nodeInternalId, String err) {
+        this.nodeInternalId = nodeInternalId;
+        this.err = err;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0bddcdef/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 796310f..ef67ec4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -243,7 +243,7 @@ public class ZookeeperDiscoveryImpl {
             return;
         }
 
-        sendCustomMessage(new ZkInternalFailNodeMessage(nodeId, warning));
+        sendCustomMessage(new ZkInternalForceNodeFailMessage(nodeId, warning));
     }
 
     /**
@@ -618,27 +618,19 @@ public class ZookeeperDiscoveryImpl {
 
         TreeMap<Integer, String> alives = new TreeMap<>();
 
-        Integer locInternalId = null;
+        int locInternalId = ZkIgnitePaths.aliveInternalId(state.locNodeZkPath);
 
         for (String aliveNodePath : aliveNodes) {
             Integer internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath);
 
             alives.put(internalId, aliveNodePath);
-
-            if (locInternalId == null) {
-                UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath);
-
-                if (locNode.id().equals(nodeId))
-                    locInternalId = internalId;
-            }
         }
 
         assert !alives.isEmpty();
-        assert locInternalId != null;
 
         Map.Entry<Integer, String> crdE = alives.firstEntry();
 
-        if (locInternalId.equals(crdE.getKey()))
+        if (locInternalId == crdE.getKey())
             onBecomeCoordinator(aliveNodes, locInternalId);
         else {
             assert alives.size() > 1;
@@ -746,9 +738,7 @@ public class ZookeeperDiscoveryImpl {
             assert old == null;
 
             if (!state.top.nodesByInternalId.containsKey(internalId)) {
-                generateNodeJoin(curTop, internalId, child);
-
-                watchAliveNodeData(child);
+                processJoinOnCoordinator(curTop, internalId, child);
 
                 newEvts = true;
             }
@@ -779,6 +769,81 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
+     * @param curTop Current nodes.
+     * @param internalId Joined node internal ID.
+     * @param aliveNodePath Joined node path.
+     * @throws Exception If failed.
+     */
+    private void processJoinOnCoordinator(TreeMap<Long, ZookeeperClusterNode> 
curTop,
+        int internalId,
+        String aliveNodePath) throws Exception {
+        UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath);
+
+        String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, 
aliveNodePath);
+        byte[] joinData;
+
+        try {
+            joinData = state.zkClient.getData(joinDataPath);
+        }
+        catch (KeeperException.NoNodeException e) {
+            U.warn(log, "Failed to read joining node data, node left before 
join process finished: " + nodeId);
+
+            return;
+        }
+
+        String err = null;
+
+        ZkJoiningNodeData joiningNodeData = null;
+
+        try {
+            joiningNodeData = unmarshalZip(joinData);
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to unmarshal joining node data [nodePath=" + 
aliveNodePath + "']", e);
+
+            err = "Failed to unmarshal join data: " + e;
+        }
+
+        if (err == null) {
+            assert joiningNodeData != null;
+
+            err = validateJoiningNode(joiningNodeData.node());
+        }
+
+        if (err == null) {
+            ZookeeperClusterNode joinedNode = joiningNodeData.node();
+
+            assert nodeId.equals(joinedNode.id()) : joiningNodeData.node();
+
+            generateNodeJoin(curTop, joinData, joiningNodeData, internalId);
+
+            watchAliveNodeData(aliveNodePath);
+        }
+        else {
+            ZkInternalJoinErrorMessage msg = new 
ZkInternalJoinErrorMessage(internalId, err);
+
+            // IgniteNodeValidationResult err = 
spi.getSpiContext().validateNode(node);
+        }
+    }
+
+    /**
+     * @param node Joining node.
+     * @return
+     */
+    @Nullable private String validateJoiningNode(ZookeeperClusterNode node) {
+        ZookeeperClusterNode node0 = state.top.nodesById.get(node.id());
+
+        if (node0 != null) {
+            U.error(log, "Failed to include node in cluster, node with the 
same ID already exists [joiningNode=" + node +
+                ", existingNode=" + node0 + ']');
+
+            return "Node with the same ID already exists";
+        }
+
+        return null;
+    }
+
+    /**
      * @throws Exception If failed.
      */
     private void saveAndProcessNewEvents() throws Exception {
@@ -827,34 +892,18 @@ public class ZookeeperDiscoveryImpl {
     /**
      * @param curTop Current nodes.
      * @param internalId Joined node internal ID.
-     * @param aliveNodePath Joined node path.
      * @throws Exception If failed.
      */
-    private void generateNodeJoin(TreeMap<Long, ZookeeperClusterNode> curTop,
-        int internalId,
-        String aliveNodePath)
+    private void generateNodeJoin(
+        TreeMap<Long, ZookeeperClusterNode> curTop,
+        byte[] joinData,
+        ZkJoiningNodeData joiningNodeData,
+        int internalId)
         throws Exception
     {
-        UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath);
-
-        String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, 
aliveNodePath);
-        byte[] joinData;
-
-        try {
-            joinData = state.zkClient.getData(joinDataPath);
-        }
-        catch (KeeperException.NoNodeException e) {
-            U.warn(log, "Failed to read joining node data, node left before 
join process finished: " + nodeId);
-
-            return;
-        }
-
-        // TODO ZK: fail node if can not unmarshal.
-        ZkJoiningNodeData joiningNodeData = unmarshalZip(joinData);
-
         ZookeeperClusterNode joinedNode = joiningNodeData.node();
 
-        assert nodeId.equals(joinedNode.id()) : joiningNodeData.node();
+        UUID nodeId = joinedNode.id();
 
         state.evtsData.topVer++;
         state.evtsData.evtIdGen++;
@@ -1061,8 +1110,8 @@ public class ZookeeperDiscoveryImpl {
 
                         state.evtsData.evtIdGen++;
 
-                        if (msg instanceof ZkInternalFailNodeMessage) {
-                            ZkInternalFailNodeMessage msg0 = 
(ZkInternalFailNodeMessage)msg;
+                        if (msg instanceof ZkInternalForceNodeFailMessage) {
+                            ZkInternalForceNodeFailMessage msg0 = 
(ZkInternalForceNodeFailMessage)msg;
 
                             if (alives == null)
                                 alives = new 
HashSet<>(state.top.nodesById.keySet());
@@ -1358,8 +1407,8 @@ public class ZookeeperDiscoveryImpl {
      * @param msg
      */
     private void processInternalMessage(ZkDiscoveryCustomEventData evtData, 
ZkInternalMessage msg) throws Exception {
-        if (msg instanceof ZkInternalFailNodeMessage) {
-            ZkInternalFailNodeMessage msg0 = (ZkInternalFailNodeMessage)msg;
+        if (msg instanceof ZkInternalForceNodeFailMessage) {
+            ZkInternalForceNodeFailMessage msg0 = 
(ZkInternalForceNodeFailMessage)msg;
 
             ClusterNode creatorNode = 
state.top.nodesById.get(evtData.sndNodeId);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0bddcdef/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 875d264..2c6890f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -120,6 +120,9 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     /** */
     private boolean dfltConsistenId;
 
+    /** */
+    private UUID nodeId;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         if (testSockNio)
@@ -127,6 +130,9 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
 
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
+        if (nodeId != null)
+            cfg.setNodeId(nodeId);
+
         if (!dfltConsistenId)
             cfg.setConsistentId(igniteInstanceName);
 
@@ -1397,6 +1403,17 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testDuplicatedNodeId() throws Exception {
+        nodeId = UUID.randomUUID();
+
+        startGrid(0);
+
+        startGrid(1);
+    }
+
+    /**
      * @param clients Clients.
      * @param c Closure to run.
      * @throws Exception If failed.

Reply via email to