Repository: ignite
Updated Branches:
  refs/heads/ignite-zk bbd5a889f -> d3a80fb03


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d3a80fb0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d3a80fb0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d3a80fb0

Branch: refs/heads/ignite-zk
Commit: d3a80fb03a931a05f869f0f84730a8b793fd64ee
Parents: bbd5a88
Author: sboikov <[email protected]>
Authored: Thu Dec 7 10:05:33 2017 +0300
Committer: sboikov <[email protected]>
Committed: Thu Dec 7 10:52:40 2017 +0300

----------------------------------------------------------------------
 .../internal/ZkDiscoveryNodeJoinEventData.java  |  7 +-
 .../discovery/zk/internal/ZkIgnitePaths.java    |  2 +-
 .../discovery/zk/internal/ZookeeperClient.java  |  4 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 78 +++++++++++------
 .../zk/internal/ZookeeperClientTest.java        |  4 +-
 .../ZookeeperDiscoverySpiBasicTest.java         | 89 +++++++++++++++++---
 6 files changed, 142 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d3a80fb0/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
index fbf1fc8..5081a4d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
@@ -37,6 +37,9 @@ class ZkDiscoveryNodeJoinEventData extends 
ZkDiscoveryEventData {
     final int joinDataPartCnt;
 
     /** */
+    final int dataForJoinedPartCnt;
+
+    /** */
     final UUID joinDataPrefixId;
 
     /** */
@@ -53,7 +56,8 @@ class ZkDiscoveryNodeJoinEventData extends 
ZkDiscoveryEventData {
         UUID nodeId,
         int joinedInternalId,
         UUID joinDataPrefixId,
-        int joinDataPartCnt)
+        int joinDataPartCnt,
+        int dataForJoinedPartCnt)
     {
         super(evtId, EventType.EVT_NODE_JOINED, topVer);
 
@@ -61,6 +65,7 @@ class ZkDiscoveryNodeJoinEventData extends 
ZkDiscoveryEventData {
         this.joinedInternalId = joinedInternalId;
         this.joinDataPrefixId = joinDataPrefixId;
         this.joinDataPartCnt = joinDataPartCnt;
+        this.dataForJoinedPartCnt = dataForJoinedPartCnt;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d3a80fb0/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index e52127a..c9c0281 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -214,7 +214,7 @@ class ZkIgnitePaths {
      * @return Event zk path.
      */
     String joinEventDataPathForJoined(long evtId) {
-        return evtsPath + "/joined-" + evtId;
+        return evtsPath + "/fj-" + evtId;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d3a80fb0/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index a806548..e2ec675 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -307,9 +307,11 @@ public class ZookeeperClient implements Watcher {
 
             byte[] part = new byte[partSize0];
 
-            System.arraycopy(data, i * partCnt, part, 0, part.length);
+            System.arraycopy(data, i * partSize, part, 0, part.length);
 
             remaining -= partSize0;
+
+            parts.add(part);
         }
 
         assert remaining == 0 : remaining;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d3a80fb0/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 366c162..9f405b4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -585,6 +585,8 @@ public class ZookeeperDiscoveryImpl {
             for (int i = 0; i < partCnt; i++) {
                 byte[] part = zkClient.getData(multipartPathName(basePath, i));
 
+                parts.add(part);
+
                 totSize += part.length;
             }
 
@@ -606,7 +608,7 @@ public class ZookeeperDiscoveryImpl {
             return zkClient.getData(multipartPathName(basePath, 0));
     }
 
-    private void saveMultipleParts(ZookeeperClient zkClient, String basePath, 
List<byte[]> parts)
+    private int saveMultipleParts(ZookeeperClient zkClient, String basePath, 
List<byte[]> parts)
         throws ZookeeperClientFailedException, InterruptedException
     {
         assert parts.size() > 1;
@@ -618,6 +620,8 @@ public class ZookeeperDiscoveryImpl {
 
             zkClient.createIfNeeded(path, part, PERSISTENT);
         }
+
+        return parts.size();
     }
 
     private static String multipartPathName(String basePath, int part) {
@@ -628,11 +632,13 @@ public class ZookeeperDiscoveryImpl {
      * @param joinDataBytes Joining node data.
      * @throws InterruptedException If interrupted.
      */
-    private void startJoin(byte[] joinDataBytes) throws InterruptedException {
+    private void startJoin(final byte[] joinDataBytes) throws 
InterruptedException {
         if (!busyLock.enterBusy())
             return;
 
         try {
+            long startTime = System.currentTimeMillis();
+
             initZkNodes();
 
             String prefix = UUID.randomUUID().toString();
@@ -675,6 +681,8 @@ public class ZookeeperDiscoveryImpl {
             log.info("Node started join [nodeId=" + locNode.id() +
                 ", instanceName=" + 
locNode.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME) +
                 ", joinDataSize=" + joinDataBytes.length +
+                ", joinDataPartCnt=" + rtState.joinDataPartCnt +
+                ", initTime=" + (System.currentTimeMillis() - startTime) +
                 ", nodePath=" + rtState.locNodeZkPath + ']');
 
             rtState.internalOrder = 
ZkIgnitePaths.aliveInternalId(rtState.locNodeZkPath);
@@ -1191,6 +1199,8 @@ public class ZookeeperDiscoveryImpl {
         joinedNode.order(rtState.evtsData.topVer);
         joinedNode.internalId(internalId);
 
+        long evtId = rtState.evtsData.evtIdGen;
+
         DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(nodeId);
 
         joiningNodeBag.joiningNodeData(joiningNodeData.discoveryData());
@@ -1211,13 +1221,37 @@ public class ZookeeperDiscoveryImpl {
 
         assert old == null;
 
+        long addDataStart = System.currentTimeMillis();
+
+        byte[] dataForJoinedBytes = marshalZip(dataForJoined);
+
+        int overhead = 5;
+
+        String dataPathForJoined = zkPaths.joinEventDataPathForJoined(evtId);
+
+        int dataForJoinedPartCnt = 1;
+
+        if (rtState.zkClient.needSplitNodeData(dataPathForJoined, 
dataForJoinedBytes, overhead)) {
+            dataForJoinedPartCnt = saveMultipleParts(rtState.zkClient,
+                dataPathForJoined,
+                rtState.zkClient.splitNodeData(dataPathForJoined, 
dataForJoinedBytes, overhead));
+        }
+        else {
+            
rtState.zkClient.createIfNeeded(multipartPathName(dataPathForJoined, 0),
+                dataForJoinedBytes,
+                PERSISTENT);
+        }
+
+        long addDataTime = System.currentTimeMillis() - addDataStart;
+
         ZkDiscoveryNodeJoinEventData evtData = new 
ZkDiscoveryNodeJoinEventData(
-            rtState.evtsData.evtIdGen,
+            evtId,
             rtState.evtsData.topVer,
             joinedNode.id(),
             joinedNode.internalId(),
             prefixId,
-            joiningNodeData.partCount());
+            joiningNodeData.partCount(),
+            dataForJoinedPartCnt);
 
         evtData.joiningNodeData = joiningNodeData;
 
@@ -1225,18 +1259,11 @@ public class ZookeeperDiscoveryImpl {
 
         evtData.addRemainingAck(joinedNode); // Topology for joined node does 
not contain joined node.
 
-        byte[] dataForJoinedBytes = marshalZip(dataForJoined);
-
-        long start = System.currentTimeMillis();
-
-        
rtState.zkClient.createIfNeeded(zkPaths.joinEventDataPathForJoined(evtData.eventId()),
 dataForJoinedBytes, PERSISTENT);
-
-        long time = System.currentTimeMillis() - start;
-
         if (log.isInfoEnabled()) {
             log.info("Generated NODE_JOINED event [evt=" + evtData +
                 ", dataForJoinedSize=" + dataForJoinedBytes.length +
-                ", addDataTime=" + time + ']');
+                ", dataForJoinedPartCnt=" + dataForJoinedPartCnt +
+                ", addDataTime=" + addDataTime + ']');
         }
     }
 
@@ -1626,7 +1653,9 @@ public class ZookeeperDiscoveryImpl {
 
         String path = zkPaths.joinEventDataPathForJoined(evtData.eventId());
 
-        ZkJoinEventDataForJoined dataForJoined = 
unmarshalZip(rtState.zkClient.getData(path));
+        byte[] dataForJoinedBytes = readMultipleParts(rtState.zkClient, path, 
evtData.dataForJoinedPartCnt);
+
+        ZkJoinEventDataForJoined dataForJoined = 
unmarshalZip(dataForJoinedBytes);
 
         rtState.gridStartTime = evtsData.gridStartTime;
 
@@ -1678,10 +1707,7 @@ public class ZookeeperDiscoveryImpl {
 
         rtState.joined = true;
 
-        if (log.isDebugEnabled())
-            log.debug("Delete data for joined: " + path);
-
-        rtState.zkClient.deleteIfExistsAsync(path);
+        deleteDataForJoined(evtData);
     }
 
     /**
@@ -1973,12 +1999,7 @@ public class ZookeeperDiscoveryImpl {
 
         deleteJoiningNodeData(evtData.nodeId, evtData.joinDataPrefixId, 
evtData.joinDataPartCnt);
 
-        String dataForJoinedPath = 
zkPaths.joinEventDataPathForJoined(evtData.eventId());
-
-        if (log.isDebugEnabled())
-            log.debug("Delete data for joined node [path=" + dataForJoinedPath 
+ ']');
-
-        rtState.zkClient.deleteIfExistsAsync(dataForJoinedPath);
+        deleteDataForJoined(evtData);
     }
 
     private void deleteJoiningNodeData(UUID nodeId, UUID joinDataPrefixId, int 
partCnt) throws Exception {
@@ -1993,6 +2014,15 @@ public class ZookeeperDiscoveryImpl {
             deleteMultiplePartsAsync(rtState.zkClient, evtDataPath + ":", 
partCnt);
     }
 
+    private void deleteDataForJoined(ZkDiscoveryNodeJoinEventData evtData) {
+        String dataForJoinedPath = 
zkPaths.joinEventDataPathForJoined(evtData.eventId());
+
+        if (log.isDebugEnabled())
+            log.debug("Delete data for joined node [path=" + dataForJoinedPath 
+ ']');
+
+        deleteMultiplePartsAsync(rtState.zkClient, dataForJoinedPath, 
evtData.dataForJoinedPartCnt);
+    }
+
     /**
      * @param evtData Event data.
      * @throws Exception If failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/d3a80fb0/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
index 0c43f62..899b8e6 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
@@ -70,6 +70,8 @@ public class ZookeeperClientTest extends 
GridCommonAbstractTest {
 
         List<byte[]> parts = client.splitNodeData(basePath, data, 2);
 
+        assertTrue(parts.size() > 1);
+
         ZooKeeper zk = client.zk();
 
         for (int i = 0; i < parts.size(); i++) {
@@ -77,7 +79,7 @@ public class ZookeeperClientTest extends 
GridCommonAbstractTest {
 
             assertTrue(part.length > 0);
 
-            String path0 = basePath + ":" + 1;
+            String path0 = basePath + ":" + i;
 
             zk.create(path0, part, ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d3a80fb0/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 6c32a4e..c2e4aba 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -63,11 +63,13 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.logger.java.JavaLogger;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.zookeeper.ZKUtil;
 import org.apache.zookeeper.ZkTestClientCnxnSocketNIO;
 import org.apache.zookeeper.ZooKeeper;
 import org.jetbrains.annotations.Nullable;
@@ -1214,17 +1216,22 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
      */
     public void testRandomTopologyChanges() throws Exception {
         randomTopologyChanges(false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void printZkNodes() throws Exception {
+        ZookeeperClient zkClient = new ZookeeperClient(new JavaLogger(), 
zkCluster.getConnectString(), 10_000, null);
+
+        List<String> children = ZKUtil.listSubTreeBFS(zkClient.zk(), 
IGNITE_ZK_ROOT);
+
+        info("Zookeeper nodes:");
 
-//        ZookeeperClient zkClient = new ZookeeperClient(new JavaLogger(), 
zkCluster.getConnectString(), 10_000, null);
-//
-//        List<String> children = ZKUtil.listSubTreeBFS(zkClient.zk(), 
IGNITE_ZK_ROOT);
-//
-//        info("Children after test:");
-//
-//        for (String s : children)
-//            info(s);
-//
-//        zkClient.close();
+        for (String s : children)
+            info(s);
+
+        zkClient.close();
     }
 
     /**
@@ -1283,17 +1290,71 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testLargeUserAttribute() throws Exception {
+    public void testLargeUserAttribute1() throws Exception {
+        initLargeAttribute();
+
+        startGrid(0);
+
+        printZkNodes();
+
+        userAttrs = null;
+
+        startGrid(1);
+
+        waitForEventsAcks(ignite(0));
+
+        printZkNodes();
+
+        waitForTopology(2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLargeUserAttribute2() throws Exception {
+        startGrid(0);
+
+        initLargeAttribute();
+
+        startGrid(1);
+
+        waitForEventsAcks(ignite(0));
+
+        printZkNodes();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLargeUserAttribute3() throws Exception {
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        for (int i = 0; i < 25; i++) {
+            if (rnd.nextBoolean())
+                initLargeAttribute();
+            else
+                userAttrs = null;
+
+            client = i > 5;
+
+            startGrid(i);
+        }
+
+        waitForTopology(25);
+    }
+
+    /**
+     *
+     */
+    private void initLargeAttribute() {
         userAttrs = new HashMap<>();
 
-        int[] attr = new int[1024 * 1024];
+        int[] attr = new int[1024 * 1024 + 
ThreadLocalRandom.current().nextInt(1024)];
 
         for (int i = 0; i < attr.length; i++)
             attr[i] = i;
 
         userAttrs.put("testAttr", attr);
-
-        startGrid(0);
     }
 
     /**

Reply via email to