Repository: ignite Updated Branches: refs/heads/ignite-zk bbd5a889f -> d3a80fb03
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d3a80fb0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d3a80fb0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d3a80fb0 Branch: refs/heads/ignite-zk Commit: d3a80fb03a931a05f869f0f84730a8b793fd64ee Parents: bbd5a88 Author: sboikov <[email protected]> Authored: Thu Dec 7 10:05:33 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Dec 7 10:52:40 2017 +0300 ---------------------------------------------------------------------- .../internal/ZkDiscoveryNodeJoinEventData.java | 7 +- .../discovery/zk/internal/ZkIgnitePaths.java | 2 +- .../discovery/zk/internal/ZookeeperClient.java | 4 +- .../zk/internal/ZookeeperDiscoveryImpl.java | 78 +++++++++++------ .../zk/internal/ZookeeperClientTest.java | 4 +- .../ZookeeperDiscoverySpiBasicTest.java | 89 +++++++++++++++++--- 6 files changed, 142 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d3a80fb0/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java index fbf1fc8..5081a4d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java @@ -37,6 +37,9 @@ class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData { final int joinDataPartCnt; /** */ + final int dataForJoinedPartCnt; + + /** */ final UUID joinDataPrefixId; /** */ @@ -53,7 +56,8 @@ class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData { UUID nodeId, int joinedInternalId, UUID joinDataPrefixId, - int joinDataPartCnt) + int joinDataPartCnt, + int dataForJoinedPartCnt) { super(evtId, EventType.EVT_NODE_JOINED, topVer); @@ -61,6 +65,7 @@ class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData { this.joinedInternalId = joinedInternalId; this.joinDataPrefixId = joinDataPrefixId; this.joinDataPartCnt = joinDataPartCnt; + this.dataForJoinedPartCnt = dataForJoinedPartCnt; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d3a80fb0/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java index e52127a..c9c0281 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java @@ -214,7 +214,7 @@ class ZkIgnitePaths { * @return Event zk path. */ String joinEventDataPathForJoined(long evtId) { - return evtsPath + "/joined-" + evtId; + return evtsPath + "/fj-" + evtId; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d3a80fb0/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java index a806548..e2ec675 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java @@ -307,9 +307,11 @@ public class ZookeeperClient implements Watcher { byte[] part = new byte[partSize0]; - System.arraycopy(data, i * partCnt, part, 0, part.length); + System.arraycopy(data, i * partSize, part, 0, part.length); remaining -= partSize0; + + parts.add(part); } assert remaining == 0 : remaining; http://git-wip-us.apache.org/repos/asf/ignite/blob/d3a80fb0/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 366c162..9f405b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -585,6 +585,8 @@ public class ZookeeperDiscoveryImpl { for (int i = 0; i < partCnt; i++) { byte[] part = zkClient.getData(multipartPathName(basePath, i)); + parts.add(part); + totSize += part.length; } @@ -606,7 +608,7 @@ public class ZookeeperDiscoveryImpl { return zkClient.getData(multipartPathName(basePath, 0)); } - private void saveMultipleParts(ZookeeperClient zkClient, String basePath, List<byte[]> parts) + private int saveMultipleParts(ZookeeperClient zkClient, String basePath, List<byte[]> parts) throws ZookeeperClientFailedException, InterruptedException { assert parts.size() > 1; @@ -618,6 +620,8 @@ public class ZookeeperDiscoveryImpl { zkClient.createIfNeeded(path, part, PERSISTENT); } + + return parts.size(); } private static String multipartPathName(String basePath, int part) { @@ -628,11 +632,13 @@ public class ZookeeperDiscoveryImpl { * @param joinDataBytes Joining node data. * @throws InterruptedException If interrupted. */ - private void startJoin(byte[] joinDataBytes) throws InterruptedException { + private void startJoin(final byte[] joinDataBytes) throws InterruptedException { if (!busyLock.enterBusy()) return; try { + long startTime = System.currentTimeMillis(); + initZkNodes(); String prefix = UUID.randomUUID().toString(); @@ -675,6 +681,8 @@ public class ZookeeperDiscoveryImpl { log.info("Node started join [nodeId=" + locNode.id() + ", instanceName=" + locNode.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME) + ", joinDataSize=" + joinDataBytes.length + + ", joinDataPartCnt=" + rtState.joinDataPartCnt + + ", initTime=" + (System.currentTimeMillis() - startTime) + ", nodePath=" + rtState.locNodeZkPath + ']'); rtState.internalOrder = ZkIgnitePaths.aliveInternalId(rtState.locNodeZkPath); @@ -1191,6 +1199,8 @@ public class ZookeeperDiscoveryImpl { joinedNode.order(rtState.evtsData.topVer); joinedNode.internalId(internalId); + long evtId = rtState.evtsData.evtIdGen; + DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(nodeId); joiningNodeBag.joiningNodeData(joiningNodeData.discoveryData()); @@ -1211,13 +1221,37 @@ public class ZookeeperDiscoveryImpl { assert old == null; + long addDataStart = System.currentTimeMillis(); + + byte[] dataForJoinedBytes = marshalZip(dataForJoined); + + int overhead = 5; + + String dataPathForJoined = zkPaths.joinEventDataPathForJoined(evtId); + + int dataForJoinedPartCnt = 1; + + if (rtState.zkClient.needSplitNodeData(dataPathForJoined, dataForJoinedBytes, overhead)) { + dataForJoinedPartCnt = saveMultipleParts(rtState.zkClient, + dataPathForJoined, + rtState.zkClient.splitNodeData(dataPathForJoined, dataForJoinedBytes, overhead)); + } + else { + rtState.zkClient.createIfNeeded(multipartPathName(dataPathForJoined, 0), + dataForJoinedBytes, + PERSISTENT); + } + + long addDataTime = System.currentTimeMillis() - addDataStart; + ZkDiscoveryNodeJoinEventData evtData = new ZkDiscoveryNodeJoinEventData( - rtState.evtsData.evtIdGen, + evtId, rtState.evtsData.topVer, joinedNode.id(), joinedNode.internalId(), prefixId, - joiningNodeData.partCount()); + joiningNodeData.partCount(), + dataForJoinedPartCnt); evtData.joiningNodeData = joiningNodeData; @@ -1225,18 +1259,11 @@ public class ZookeeperDiscoveryImpl { evtData.addRemainingAck(joinedNode); // Topology for joined node does not contain joined node. - byte[] dataForJoinedBytes = marshalZip(dataForJoined); - - long start = System.currentTimeMillis(); - - rtState.zkClient.createIfNeeded(zkPaths.joinEventDataPathForJoined(evtData.eventId()), dataForJoinedBytes, PERSISTENT); - - long time = System.currentTimeMillis() - start; - if (log.isInfoEnabled()) { log.info("Generated NODE_JOINED event [evt=" + evtData + ", dataForJoinedSize=" + dataForJoinedBytes.length + - ", addDataTime=" + time + ']'); + ", dataForJoinedPartCnt=" + dataForJoinedPartCnt + + ", addDataTime=" + addDataTime + ']'); } } @@ -1626,7 +1653,9 @@ public class ZookeeperDiscoveryImpl { String path = zkPaths.joinEventDataPathForJoined(evtData.eventId()); - ZkJoinEventDataForJoined dataForJoined = unmarshalZip(rtState.zkClient.getData(path)); + byte[] dataForJoinedBytes = readMultipleParts(rtState.zkClient, path, evtData.dataForJoinedPartCnt); + + ZkJoinEventDataForJoined dataForJoined = unmarshalZip(dataForJoinedBytes); rtState.gridStartTime = evtsData.gridStartTime; @@ -1678,10 +1707,7 @@ public class ZookeeperDiscoveryImpl { rtState.joined = true; - if (log.isDebugEnabled()) - log.debug("Delete data for joined: " + path); - - rtState.zkClient.deleteIfExistsAsync(path); + deleteDataForJoined(evtData); } /** @@ -1973,12 +1999,7 @@ public class ZookeeperDiscoveryImpl { deleteJoiningNodeData(evtData.nodeId, evtData.joinDataPrefixId, evtData.joinDataPartCnt); - String dataForJoinedPath = zkPaths.joinEventDataPathForJoined(evtData.eventId()); - - if (log.isDebugEnabled()) - log.debug("Delete data for joined node [path=" + dataForJoinedPath + ']'); - - rtState.zkClient.deleteIfExistsAsync(dataForJoinedPath); + deleteDataForJoined(evtData); } private void deleteJoiningNodeData(UUID nodeId, UUID joinDataPrefixId, int partCnt) throws Exception { @@ -1993,6 +2014,15 @@ public class ZookeeperDiscoveryImpl { deleteMultiplePartsAsync(rtState.zkClient, evtDataPath + ":", partCnt); } + private void deleteDataForJoined(ZkDiscoveryNodeJoinEventData evtData) { + String dataForJoinedPath = zkPaths.joinEventDataPathForJoined(evtData.eventId()); + + if (log.isDebugEnabled()) + log.debug("Delete data for joined node [path=" + dataForJoinedPath + ']'); + + deleteMultiplePartsAsync(rtState.zkClient, dataForJoinedPath, evtData.dataForJoinedPartCnt); + } + /** * @param evtData Event data. * @throws Exception If failed. http://git-wip-us.apache.org/repos/asf/ignite/blob/d3a80fb0/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java index 0c43f62..899b8e6 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java @@ -70,6 +70,8 @@ public class ZookeeperClientTest extends GridCommonAbstractTest { List<byte[]> parts = client.splitNodeData(basePath, data, 2); + assertTrue(parts.size() > 1); + ZooKeeper zk = client.zk(); for (int i = 0; i < parts.size(); i++) { @@ -77,7 +79,7 @@ public class ZookeeperClientTest extends GridCommonAbstractTest { assertTrue(part.length > 0); - String path0 = basePath + ":" + 1; + String path0 = basePath + ":" + i; zk.create(path0, part, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d3a80fb0/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index 6c32a4e..c2e4aba 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -63,11 +63,13 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.logger.java.JavaLogger; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.zookeeper.ZKUtil; import org.apache.zookeeper.ZkTestClientCnxnSocketNIO; import org.apache.zookeeper.ZooKeeper; import org.jetbrains.annotations.Nullable; @@ -1214,17 +1216,22 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { */ public void testRandomTopologyChanges() throws Exception { randomTopologyChanges(false, false); + } + + /** + * @throws Exception If failed. + */ + private void printZkNodes() throws Exception { + ZookeeperClient zkClient = new ZookeeperClient(new JavaLogger(), zkCluster.getConnectString(), 10_000, null); + + List<String> children = ZKUtil.listSubTreeBFS(zkClient.zk(), IGNITE_ZK_ROOT); + + info("Zookeeper nodes:"); -// ZookeeperClient zkClient = new ZookeeperClient(new JavaLogger(), zkCluster.getConnectString(), 10_000, null); -// -// List<String> children = ZKUtil.listSubTreeBFS(zkClient.zk(), IGNITE_ZK_ROOT); -// -// info("Children after test:"); -// -// for (String s : children) -// info(s); -// -// zkClient.close(); + for (String s : children) + info(s); + + zkClient.close(); } /** @@ -1283,17 +1290,71 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testLargeUserAttribute() throws Exception { + public void testLargeUserAttribute1() throws Exception { + initLargeAttribute(); + + startGrid(0); + + printZkNodes(); + + userAttrs = null; + + startGrid(1); + + waitForEventsAcks(ignite(0)); + + printZkNodes(); + + waitForTopology(2); + } + + /** + * @throws Exception If failed. + */ + public void testLargeUserAttribute2() throws Exception { + startGrid(0); + + initLargeAttribute(); + + startGrid(1); + + waitForEventsAcks(ignite(0)); + + printZkNodes(); + } + + /** + * @throws Exception If failed. + */ + public void testLargeUserAttribute3() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < 25; i++) { + if (rnd.nextBoolean()) + initLargeAttribute(); + else + userAttrs = null; + + client = i > 5; + + startGrid(i); + } + + waitForTopology(25); + } + + /** + * + */ + private void initLargeAttribute() { userAttrs = new HashMap<>(); - int[] attr = new int[1024 * 1024]; + int[] attr = new int[1024 * 1024 + ThreadLocalRandom.current().nextInt(1024)]; for (int i = 0; i < attr.length; i++) attr[i] = i; userAttrs.put("testAttr", attr); - - startGrid(0); } /**
