zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bbd5a889 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bbd5a889 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bbd5a889 Branch: refs/heads/ignite-zk Commit: bbd5a889fc980e36c8c500ef221a551e451ad854 Parents: 961167a Author: sboikov <[email protected]> Authored: Tue Dec 5 14:57:04 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Dec 5 17:57:35 2017 +0300 ---------------------------------------------------------------------- .../internal/ZkDiscoveryNodeJoinEventData.java | 16 +- .../discovery/zk/internal/ZkIgnitePaths.java | 37 +-- .../zk/internal/ZkInternalJoinErrorMessage.java | 3 + .../zk/internal/ZkJoiningNodeData.java | 15 +- .../discovery/zk/internal/ZkRuntimeState.java | 3 + .../discovery/zk/internal/ZookeeperClient.java | 45 +++ .../zk/internal/ZookeeperDiscoveryImpl.java | 289 ++++++++++++++----- .../zk/internal/ZookeeperClientTest.java | 58 ++-- 8 files changed, 328 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java index df4c137..fbf1fc8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java @@ -34,6 +34,12 @@ class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData { final UUID nodeId; /** */ + final int joinDataPartCnt; + + /** */ + final UUID joinDataPrefixId; + + /** */ transient ZkJoiningNodeData joiningNodeData; /** @@ -42,11 +48,19 @@ class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData { * @param nodeId Joined node ID. * @param joinedInternalId Joined node internal ID. */ - ZkDiscoveryNodeJoinEventData(long evtId, long topVer, UUID nodeId, int joinedInternalId) { + ZkDiscoveryNodeJoinEventData(long evtId, + long topVer, + UUID nodeId, + int joinedInternalId, + UUID joinDataPrefixId, + int joinDataPartCnt) + { super(evtId, EventType.EVT_NODE_JOINED, topVer); this.nodeId = nodeId; this.joinedInternalId = joinedInternalId; + this.joinDataPrefixId = joinDataPrefixId; + this.joinDataPartCnt = joinDataPartCnt; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java index 2478979..e52127a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java @@ -151,14 +151,8 @@ class ZkIgnitePaths { return clusterDir + "/" + path; } - String joiningNodeDataPath(UUID nodeId, String aliveNodePath) { - int joinSeq = ZkIgnitePaths.aliveJoinDataSequence(aliveNodePath); - - return joinDataDir + '/' + - ZkIgnitePaths.aliveNodePrefixId(aliveNodePath) + ":" + - nodeId.toString() + - "|" + - String.format("%010d", joinSeq); + String joiningNodeDataPath(UUID nodeId, UUID prefixId) { + return joinDataDir + '/' + prefixId + ":" + nodeId.toString(); } /** @@ -175,8 +169,8 @@ class ZkIgnitePaths { * @param path Alive node zk path. * @return Node ID. */ - static String aliveNodePrefixId(String path) { - return path.substring(0, ZkIgnitePaths.UUID_LEN); + static UUID aliveNodePrefixId(String path) { + return UUID.fromString(path.substring(0, ZkIgnitePaths.UUID_LEN)); } /** @@ -184,7 +178,7 @@ class ZkIgnitePaths { * @return Node ID. */ static UUID aliveNodeId(String path) { - // <uuid prefix>:<node id>|<join data seq>|<alive seq> + // <uuid prefix>:<node id>|<alive seq> int startIdx = ZkIgnitePaths.UUID_LEN + 1; String idStr = path.substring(startIdx, startIdx + ZkIgnitePaths.UUID_LEN); @@ -193,17 +187,6 @@ class ZkIgnitePaths { } /** - * @param path Alive node zk path. - * @return Joined node sequence. - */ - private static int aliveJoinDataSequence(String path) { - int idx2 = path.lastIndexOf('|'); - int idx1 = path.lastIndexOf('|', idx2 - 1); - - return Integer.parseInt(path.substring(idx1 + 1, idx2)); - } - - /** * @param path Event zk path. * @return Event sequence number. */ @@ -230,18 +213,14 @@ class ZkIgnitePaths { * @param evtId Event ID. * @return Event zk path. */ - String joinEventDataPath(long evtId) { - return evtsPath + "/" + evtId; + String joinEventDataPathForJoined(long evtId) { + return evtsPath + "/joined-" + evtId; } /** * @param evtId Event ID. - * @return Event zk path. + * @return Path for custom event ack. */ - String joinEventDataPathForJoined(long evtId) { - return evtsPath + "/joined-" + evtId; - } - String ackEventDataPath(long evtId) { return customEventDataPath(true, String.valueOf(evtId)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java index e724673..6040c20 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java @@ -25,6 +25,9 @@ class ZkInternalJoinErrorMessage implements ZkInternalMessage { private static final long serialVersionUID = 0L; /** */ + transient boolean notifyNode = true; + + /** */ final int nodeInternalId; /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java index 6733ab6..284cbff 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java @@ -30,12 +30,19 @@ class ZkJoiningNodeData implements Serializable { private static final long serialVersionUID = 0L; /** */ + private int partCnt; + + /** */ @GridToStringInclude - private final ZookeeperClusterNode node; + private ZookeeperClusterNode node; /** */ @GridToStringInclude - private final Map<Integer, Serializable> discoData; + private Map<Integer, Serializable> discoData; + + ZkJoiningNodeData(int partCnt) { + this.partCnt = partCnt; + } /** * @param node Node. @@ -49,6 +56,10 @@ class ZkJoiningNodeData implements Serializable { this.discoData = discoData; } + int partCount() { + return partCnt; + } + /** * @return Node. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java index 660dc42..4653109 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java @@ -33,6 +33,9 @@ class ZkRuntimeState { int internalOrder; /** */ + int joinDataPartCnt; + + /** */ IgniteSpiTimeoutObject joinTimeoutObj; /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java index bc024f1..a806548 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java @@ -276,6 +276,51 @@ public class ZookeeperClient implements Watcher { } + /** */ + private static final int MAX_REQ_SIZE = 1048528; + + /** + * @param path Path. + * @param data Data. + * @return {@code True} + */ + boolean needSplitNodeData(String path, byte[] data, int overhead) { + return requestOverhead(path) + data.length + overhead > MAX_REQ_SIZE; + } + + List<byte[]> splitNodeData(String path, byte[] data, int overhead) { + int partSize = MAX_REQ_SIZE - requestOverhead(path) - overhead; + + int partCnt = data.length / partSize; + + if (data.length % partSize != 0) + partCnt++; + + assert partCnt > 1 : "Do not need split"; + + List<byte[]> parts = new ArrayList<>(partCnt); + + int remaining = data.length; + + for (int i = 0; i < partCnt; i++) { + int partSize0 = Math.min(remaining, partSize); + + byte[] part = new byte[partSize0]; + + System.arraycopy(data, i * partCnt, part, 0, part.length); + + remaining -= partSize0; + } + + assert remaining == 0 : remaining; + + return parts; + } + + private int requestOverhead(String path) { + return path.length(); + } + /** * @param path Path. * @param data Data. http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index e246a35..366c162 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -564,6 +564,66 @@ public class ZookeeperDiscoveryImpl { } } + private void deleteMultiplePartsAsync(ZookeeperClient zkClient, String basePath, int partCnt) { + for (int i = 0; i < partCnt; i++) { + String path = multipartPathName(basePath, i); + + zkClient.deleteIfExistsAsync(path); + } + + } + + private byte[] readMultipleParts(ZookeeperClient zkClient, String basePath, int partCnt) + throws Exception { + assert partCnt >= 1; + + if (partCnt > 1) { + List<byte[]> parts = new ArrayList<>(partCnt); + + int totSize = 0; + + for (int i = 0; i < partCnt; i++) { + byte[] part = zkClient.getData(multipartPathName(basePath, i)); + + totSize += part.length; + } + + byte[] res = new byte[totSize]; + + int pos = 0; + + for (int i = 0; i < partCnt; i++) { + byte[] part = parts.get(i); + + System.arraycopy(part, 0, res, pos, part.length); + + pos += part.length; + } + + return res; + } + else + return zkClient.getData(multipartPathName(basePath, 0)); + } + + private void saveMultipleParts(ZookeeperClient zkClient, String basePath, List<byte[]> parts) + throws ZookeeperClientFailedException, InterruptedException + { + assert parts.size() > 1; + + for (int i = 0; i < parts.size(); i++) { + byte[] part = parts.get(i); + + String path = multipartPathName(basePath, i); + + zkClient.createIfNeeded(path, part, PERSISTENT); + } + } + + private static String multipartPathName(String basePath, int part) { + return basePath + String.format("%04d", part); + } + /** * @param joinDataBytes Joining node data. * @throws InterruptedException If interrupted. @@ -577,23 +637,38 @@ public class ZookeeperDiscoveryImpl { String prefix = UUID.randomUUID().toString(); - // TODO ZK: handle max size. - final ZkRuntimeState rtState = this.rtState; - String joinDataPath = rtState.zkClient.createSequential(prefix, - zkPaths.joinDataDir, - prefix + ":" + locNode.id() + "|", - joinDataBytes, - EPHEMERAL_SEQUENTIAL); + ZookeeperClient zkClient = rtState.zkClient; + + final int OVERHEAD = 5; + + // TODO ZK: need clean up join data if failed before was able to create alive node. + String joinDataPath = zkPaths.joinDataDir + "/" + prefix + ":" + locNode.id(); - // TODO ZK: no need to use sequential - int seqNum = Integer.parseInt(joinDataPath.substring(joinDataPath.lastIndexOf('|') + 1)); + if (zkClient.needSplitNodeData(joinDataPath, joinDataBytes, OVERHEAD)) { + List<byte[]> parts = zkClient.splitNodeData(joinDataPath, joinDataBytes, OVERHEAD); + + rtState.joinDataPartCnt = parts.size(); + + saveMultipleParts(zkClient, joinDataPath + ":", parts); + + joinDataPath = zkClient.createIfNeeded( + joinDataPath, + marshalZip(new ZkJoiningNodeData(parts.size())), + PERSISTENT); + } + else { + joinDataPath = zkClient.createIfNeeded( + joinDataPath, + joinDataBytes, + PERSISTENT); + } - rtState.locNodeZkPath = rtState.zkClient.createSequential( + rtState.locNodeZkPath = zkClient.createSequential( prefix, zkPaths.aliveNodesDir, - prefix + ":" + locNode.id() + "|" + seqNum + "|", + prefix + ":" + locNode.id() + "|", null, EPHEMERAL_SEQUENTIAL); @@ -605,21 +680,21 @@ public class ZookeeperDiscoveryImpl { rtState.internalOrder = ZkIgnitePaths.aliveInternalId(rtState.locNodeZkPath); /* - If node can not join due to some validation error this error is reported in join data, - As a minor optimization do not start watch this immediately, but only if do not receive - join event after timeout. + If node can not join due to validation error this error is reported in join data, + As a minor optimization do not start watch join data immediately, but only if do not receive + join event after some timeout. */ rtState.joinTimeoutObj = new CheckJoinStateTimeoutObject( - joinDataPath, + multipartPathName(joinDataPath, 0), rtState); spi.getSpiContext().addTimeoutObject(rtState.joinTimeoutObj); - rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback()); + zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback()); - rtState.zkClient.getDataAsync(zkPaths.evtsPath, watcher, dataCallback); + zkClient.getDataAsync(zkPaths.evtsPath, watcher, dataCallback); } - catch (ZookeeperClientFailedException e) { + catch (IgniteCheckedException | ZookeeperClientFailedException e) { throw new IgniteSpiException("Failed to initialize Zookeeper nodes", e); } finally { @@ -889,80 +964,135 @@ public class ZookeeperDiscoveryImpl { handleProcessedEventsOnNodesFail(failedNodes); } - /** - * @param curTop Current nodes. - * @param internalId Joined node internal ID. - * @param aliveNodePath Joined node path. - * @throws Exception If failed. - */ - private boolean processJoinOnCoordinator(TreeMap<Long, ZookeeperClusterNode> curTop, - int internalId, - String aliveNodePath) throws Exception { - UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath); + private ZkJoiningNodeData unmarshalJoinData(UUID nodeId, UUID prefixId) throws Exception { + String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId); - String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, aliveNodePath); - byte[] joinData; + byte[] joinData = rtState.zkClient.getData(joinDataPath); - try { - joinData = rtState.zkClient.getData(joinDataPath); - } - catch (KeeperException.NoNodeException e) { - U.warn(log, "Failed to read joining node data, node left before join process finished: " + nodeId); + Object dataObj = unmarshalZip(joinData); - return false; + if (!(dataObj instanceof ZkJoiningNodeData)) + throw new Exception("Invalid joined node data: " + dataObj); + + ZkJoiningNodeData joiningNodeData = (ZkJoiningNodeData)dataObj; + + if (joiningNodeData.partCount() > 1) { + joinData = readMultipleParts(rtState.zkClient, joinDataPath + ":", joiningNodeData.partCount()); + + joiningNodeData = unmarshalZip(joinData); } - String err = null; + return joiningNodeData; + } + + /** + * @param nodeId + * @param aliveNodePath + * @return + * @throws Exception If failed. + */ + private Object unmarshalJoinDataOnCoordinator(UUID nodeId, UUID prefixId, String aliveNodePath) throws Exception { + String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId); + + byte[] joinData = rtState.zkClient.getData(joinDataPath); - Object dataObj = null; + Object dataObj; try { dataObj = unmarshalZip(joinData); - if (dataObj instanceof ZkInternalJoinErrorMessage) { - if (log.isInfoEnabled()) - log.info("Ignore join data, node was failed by previous coordinator: " + aliveNodePath); - - zkClient().deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1); - - return false; - } + if (dataObj instanceof ZkInternalJoinErrorMessage) + return dataObj; } catch (Exception e) { U.error(log, "Failed to unmarshal joining node data [nodePath=" + aliveNodePath + "']", e); - err = "Failed to unmarshal join data: " + e; + return new ZkInternalJoinErrorMessage(ZkIgnitePaths.aliveInternalId(aliveNodePath), + "Failed to unmarshal join data: " + e); } assert dataObj instanceof ZkJoiningNodeData : dataObj; ZkJoiningNodeData joiningNodeData = (ZkJoiningNodeData)dataObj; - if (err == null) - err = validateJoiningNode(joiningNodeData.node()); + if (joiningNodeData.partCount() > 1) { + joinData = readMultipleParts(rtState.zkClient, joinDataPath + ":", joiningNodeData.partCount()); + + try { + joiningNodeData = unmarshalZip(joinData); + } + catch (Exception e) { + U.error(log, "Failed to unmarshal joining node data [nodePath=" + aliveNodePath + "']", e); + + return new ZkInternalJoinErrorMessage(ZkIgnitePaths.aliveInternalId(aliveNodePath), + "Failed to unmarshal join data: " + e); + } + } + + assert joiningNodeData.node() != null : joiningNodeData; + + return joiningNodeData; + } + + /** + * @param curTop Current nodes. + * @param internalId Joined node internal ID. + * @param aliveNodePath Joined node path. + * @throws Exception If failed. + */ + private boolean processJoinOnCoordinator( + TreeMap<Long, ZookeeperClusterNode> curTop, + int internalId, + String aliveNodePath) + throws Exception + { + UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath); + UUID prefixId = ZkIgnitePaths.aliveNodePrefixId(aliveNodePath); + + Object data = unmarshalJoinDataOnCoordinator(nodeId, prefixId, aliveNodePath); - if (err == null) { + ZkInternalJoinErrorMessage joinErr = null; + ZkJoiningNodeData joiningNodeData = null; + + if (data instanceof ZkJoiningNodeData) { + joiningNodeData = (ZkJoiningNodeData)data; + + String err = validateJoiningNode(joiningNodeData.node()); + + if (err != null) + joinErr = new ZkInternalJoinErrorMessage(ZkIgnitePaths.aliveInternalId(aliveNodePath), err); + } + else { + assert data instanceof ZkInternalJoinErrorMessage : data; + + joinErr = (ZkInternalJoinErrorMessage)data; + } + + if (joinErr == null) { ZookeeperClusterNode joinedNode = joiningNodeData.node(); assert nodeId.equals(joinedNode.id()) : joiningNodeData.node(); - generateNodeJoin(curTop, joinData, joiningNodeData, internalId); + generateNodeJoin(curTop, joiningNodeData, internalId, prefixId); watchAliveNodeData(aliveNodePath); return true; } else { - ZkInternalJoinErrorMessage msg = new ZkInternalJoinErrorMessage(internalId, err); + if (joinErr.notifyNode) { + String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId); - try { - zkClient().setData(joinDataPath, marshalZip(msg), -1); - } - catch (KeeperException.NoNodeException e) { - // Ignore, node already failed. + zkClient().setData(joinDataPath, marshalZip(joinErr), -1); + + zkClient().deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1); } + else { + if (log.isInfoEnabled()) + log.info("Ignore join data, node was failed by previous coordinator: " + aliveNodePath); - zkClient().deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1); + zkClient().deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1); + } return false; } @@ -1046,9 +1176,9 @@ public class ZookeeperDiscoveryImpl { */ private void generateNodeJoin( TreeMap<Long, ZookeeperClusterNode> curTop, - byte[] joinData, ZkJoiningNodeData joiningNodeData, - int internalId) + int internalId, + UUID prefixId) throws Exception { ZookeeperClusterNode joinedNode = joiningNodeData.node(); @@ -1085,7 +1215,9 @@ public class ZookeeperDiscoveryImpl { rtState.evtsData.evtIdGen, rtState.evtsData.topVer, joinedNode.id(), - joinedNode.internalId()); + joinedNode.internalId(), + prefixId, + joiningNodeData.partCount()); evtData.joiningNodeData = joiningNodeData; @@ -1097,14 +1229,12 @@ public class ZookeeperDiscoveryImpl { long start = System.currentTimeMillis(); - rtState.zkClient.createIfNeeded(zkPaths.joinEventDataPath(evtData.eventId()), joinData, PERSISTENT); rtState.zkClient.createIfNeeded(zkPaths.joinEventDataPathForJoined(evtData.eventId()), dataForJoinedBytes, PERSISTENT); long time = System.currentTimeMillis() - start; if (log.isInfoEnabled()) { log.info("Generated NODE_JOINED event [evt=" + evtData + - ", joinedDataSize=" + joinData.length + ", dataForJoinedSize=" + dataForJoinedBytes.length + ", addDataTime=" + time + ']'); } @@ -1131,14 +1261,11 @@ public class ZookeeperDiscoveryImpl { rtState.top.addNode(locNode); - String path = rtState.locNodeZkPath.substring(rtState.locNodeZkPath.lastIndexOf('/') + 1); - - String joinDataPath = zkPaths.joiningNodeDataPath(locNode.id(), path); - - if (log.isDebugEnabled()) - log.debug("Delete join data: " + joinDataPath); + String locAlivePath = rtState.locNodeZkPath.substring(rtState.locNodeZkPath.lastIndexOf('/') + 1); - rtState.zkClient.deleteIfExistsAsync(joinDataPath); + deleteJoiningNodeData(locNode.id(), + ZkIgnitePaths.aliveNodePrefixId(locAlivePath), + rtState.joinDataPartCnt); final List<ClusterNode> topSnapshot = Collections.singletonList((ClusterNode)locNode); @@ -1398,9 +1525,7 @@ public class ZookeeperDiscoveryImpl { joiningData = evtData0.joiningNodeData; } else { - String path = zkPaths.joinEventDataPath(evtData.eventId()); - - joiningData = unmarshalZip(rtState.zkClient.getData(path)); + joiningData = unmarshalJoinData(evtData0.nodeId, evtData0.joinDataPrefixId); DiscoveryDataBag dataBag = new DiscoveryDataBag(evtData0.nodeId); @@ -1846,16 +1971,28 @@ public class ZookeeperDiscoveryImpl { if (log.isDebugEnabled()) log.debug("All nodes processed node join [evtData=" + evtData + ']'); - String evtDataPath = zkPaths.joinEventDataPath(evtData.eventId()); + deleteJoiningNodeData(evtData.nodeId, evtData.joinDataPrefixId, evtData.joinDataPartCnt); + String dataForJoinedPath = zkPaths.joinEventDataPathForJoined(evtData.eventId()); if (log.isDebugEnabled()) - log.debug("Delete processed event data [path1=" + evtDataPath + ", path2=" + dataForJoinedPath + ']'); + log.debug("Delete data for joined node [path=" + dataForJoinedPath + ']'); - rtState.zkClient.deleteIfExistsAsync(evtDataPath); rtState.zkClient.deleteIfExistsAsync(dataForJoinedPath); } + private void deleteJoiningNodeData(UUID nodeId, UUID joinDataPrefixId, int partCnt) throws Exception { + String evtDataPath = zkPaths.joiningNodeDataPath(nodeId, joinDataPrefixId); + + if (log.isDebugEnabled()) + log.debug("Delete joining node data [path=" + evtDataPath + ']'); + + rtState.zkClient.deleteIfExistsAsync(evtDataPath); + + if (partCnt > 1) + deleteMultiplePartsAsync(rtState.zkClient, evtDataPath + ":", partCnt); + } + /** * @param evtData Event data. * @throws Exception If failed. http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java index ec495cf..0c43f62 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java @@ -54,36 +54,34 @@ public class ZookeeperClientTest extends GridCommonAbstractTest { super.afterTest(); } -// /** -// * @throws Exception If failed. -// */ -// public void testSaveLargeValue() throws Exception { -// startZK(1); -// -// final ZookeeperClient client = new ZookeeperClient(log, zkCluster.getConnectString(), 3000, null); -// -// ZooKeeper zk = client.zk(); -// -// int s = 1048526 + 1; -// // 1048517 11 1048528 -// // 1048519 9 1048528 -// // 1048520 8 1048528 -// -// String path = "/aaaaaaa"; -// -// while (true) { -// try { -// zk.create(path, new byte[s], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); -// -// info("Created: " + s + " " + path.length() + " " + (s + path.length())); -// -// break; -// } -// catch (KeeperException.ConnectionLossException e) { -// s -= 1; -// } -// } -// } + /** + * @throws Exception If failed. + */ + public void testSaveLargeValue() throws Exception { + startZK(1); + + final ZookeeperClient client = new ZookeeperClient(log, zkCluster.getConnectString(), 3000, null); + + byte[] data = new byte[1024 * 1024]; + + String basePath = "/ignite"; + + assertTrue(client.needSplitNodeData(basePath, data, 2)); + + List<byte[]> parts = client.splitNodeData(basePath, data, 2); + + ZooKeeper zk = client.zk(); + + for (int i = 0; i < parts.size(); i++) { + byte[] part = parts.get(i); + + assertTrue(part.length > 0); + + String path0 = basePath + ":" + 1; + + zk.create(path0, part, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + } /** * @throws Exception If failed.
