zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/26ffa0db Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/26ffa0db Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/26ffa0db Branch: refs/heads/ignite-zk Commit: 26ffa0dbbfe2276f6070bb22844cb6102738ee23 Parents: 0b78f31 Author: sboikov <[email protected]> Authored: Fri Dec 29 12:27:01 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Dec 29 12:27:01 2017 +0300 ---------------------------------------------------------------------- .../discovery/zk/internal/ZkAliveNodeData.java | 3 + .../discovery/zk/internal/ZkIgnitePaths.java | 2 +- .../discovery/zk/internal/ZkRuntimeState.java | 19 +- .../zk/internal/ZookeeperDiscoveryImpl.java | 154 +- .../ZookeeperDiscoverySpiBasicTest.java | 4247 ----------------- .../zk/internal/ZookeeperDiscoverySpiTest.java | 4311 ++++++++++++++++++ .../testframework/junits/GridAbstractTest.java | 6 +- 7 files changed, 4459 insertions(+), 4283 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/26ffa0db/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java index 45f453f..9574325 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java @@ -30,6 +30,9 @@ public class ZkAliveNodeData implements Serializable { /** */ long lastProcEvt = -1; + /** */ + transient boolean needUpdate; + /** {@inheritDoc} */ @Override public String toString() { return S.toString(ZkAliveNodeData.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/26ffa0db/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java index 4ba6de2..44b247c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java @@ -27,7 +27,7 @@ class ZkIgnitePaths { static final String PATH_SEPARATOR = "/"; /** */ - static final byte CLIENT_NODE_FLAG_MASK = 0x01; + private static final byte CLIENT_NODE_FLAG_MASK = 0x01; /** */ private static final int UUID_LEN = 36; http://git-wip-us.apache.org/repos/asf/ignite/blob/26ffa0db/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java index e61e2b2..774c2a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java @@ -49,12 +49,6 @@ class ZkRuntimeState { int joinDataPartCnt; /** */ - ZkTimeoutObject joinErrTimeoutObj; - - /** */ - ZkTimeoutObject joinTimeoutObj; - - /** */ long gridStartTime; /** */ @@ -70,7 +64,7 @@ class ZkRuntimeState { String locNodeZkPath; /** */ - ZkAliveNodeData locNodeInfo = new ZkAliveNodeData(); + final ZkAliveNodeData locNodeInfo = new ZkAliveNodeData(); /** */ int procEvtCnt; @@ -81,6 +75,17 @@ class ZkRuntimeState { /** */ List<ClusterNode> commErrProcNodes; + /** Timeout callback registering watcher for join error + * (set this watcher after timeout as a minor optimization). + */ + ZkTimeoutObject joinErrTo; + + /** Timeout callback set to wait for join timeout. */ + ZkTimeoutObject joinTo; + + /** Timeout callback to update processed events counter. */ + ZkTimeoutObject procEvtsUpdateTo; + /** * @param prevJoined {@code True} if joined topology before reconnect attempt. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/26ffa0db/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index ad7da00..8f717c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -106,6 +106,9 @@ public class ZookeeperDiscoveryImpl { static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD = "IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD"; /** */ + static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT = "IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT"; + + /** */ static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS = "IGNITE_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS"; /** */ @@ -991,10 +994,10 @@ public class ZookeeperDiscoveryImpl { */ CheckJoinErrorWatcher joinErrorWatcher = new CheckJoinErrorWatcher(5000, joinDataPath, rtState); - rtState.joinErrTimeoutObj = joinErrorWatcher.timeoutObj; + rtState.joinErrTo = joinErrorWatcher.timeoutObj; if (locNode.isClient() && spi.getJoinTimeout() > 0) { - ZkTimeoutObject joinTimeoutObj = prevState != null ? prevState.joinTimeoutObj : null; + ZkTimeoutObject joinTimeoutObj = prevState != null ? prevState.joinTo : null; if (joinTimeoutObj == null) { joinTimeoutObj = new JoinTimeoutObject(spi.getJoinTimeout()); @@ -1002,7 +1005,7 @@ public class ZookeeperDiscoveryImpl { spi.getSpiContext().addTimeoutObject(joinTimeoutObj); } - rtState.joinTimeoutObj = joinTimeoutObj; + rtState.joinTo = joinTimeoutObj; } if (!locNode.isClient()) @@ -1010,7 +1013,7 @@ public class ZookeeperDiscoveryImpl { zkClient.getDataAsync(zkPaths.evtsPath, rtState.watcher, rtState.watcher); - spi.getSpiContext().addTimeoutObject(rtState.joinErrTimeoutObj); + spi.getSpiContext().addTimeoutObject(rtState.joinErrTo); } catch (IgniteCheckedException | ZookeeperClientFailedException e) { throw new IgniteSpiException("Failed to initialize Zookeeper nodes", e); @@ -1031,7 +1034,7 @@ public class ZookeeperDiscoveryImpl { try { SecurityContext subj = nodeAuth.authenticateNode(locNode, locCred); - // Note: exception message test is checked in tests. + // Note: exception message is checked in tests. if (subj == null) throw new IgniteSpiException("Authentication failed for local node."); @@ -1109,6 +1112,33 @@ public class ZookeeperDiscoveryImpl { /** * */ + private class UpdateProcessedEventsTimeoutObject extends ZkTimeoutObject { + /** */ + private final ZkRuntimeState rtState; + + /** + * @param rtState Runtime state. + * @param timeout Timeout. + */ + UpdateProcessedEventsTimeoutObject(ZkRuntimeState rtState, long timeout) { + super(timeout); + + this.rtState = rtState; + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + runInWorkerThread(new ZkRunnable(rtState, ZookeeperDiscoveryImpl.this) { + @Override protected void run0() throws Exception { + updateProcessedEventsOnTimeout(rtState, UpdateProcessedEventsTimeoutObject.this); + } + }); + } + } + + /** + * + */ private class JoinTimeoutObject extends ZkTimeoutObject { /** * @param timeout Timeout. @@ -1138,7 +1168,7 @@ public class ZookeeperDiscoveryImpl { "are no alive server nodes (consider increasing 'joinTimeout' configuration property) [" + "joinTimeout=" + spi.getJoinTimeout() + ']'); - // Note: exception message test is checked in tests. + // Note: exception message is checked in tests. onSegmented(new IgniteSpiException("Failed to connect to cluster within configured timeout")); } }); @@ -1801,6 +1831,7 @@ public class ZookeeperDiscoveryImpl { U.error(log, "Failed to include node in cluster, node with the same ID already exists [joiningNode=" + node + ", existingNode=" + node0 + ']'); + // Note: exception message is checked in tests. return new ZkNodeValidateResult("Node with the same ID already exists: " + node0); } @@ -2055,7 +2086,7 @@ public class ZookeeperDiscoveryImpl { assert prevEvts == null || prevEvts.maxInternalOrder < locInternalId; - spi.getSpiContext().removeTimeoutObject(rtState.joinErrTimeoutObj); + spi.getSpiContext().removeTimeoutObject(rtState.joinErrTo); cleanupPreviousClusterData(); @@ -2423,6 +2454,7 @@ public class ZookeeperDiscoveryImpl { ZookeeperClient zkClient = rtState.zkClient; + boolean evtProcessed = false; boolean updateNodeInfo = false; for (ZkDiscoveryEventData evtData : evts.tailMap(rtState.locNodeInfo.lastProcEvt, false).values()) { @@ -2441,6 +2473,8 @@ public class ZookeeperDiscoveryImpl { assert locNode.id().equals(joinedId); processLocalJoin(evtsData, evtData0); + + evtProcessed = true; } } else { @@ -2533,6 +2567,8 @@ public class ZookeeperDiscoveryImpl { default: assert false : "Invalid event: " + evtData; } + + evtProcessed = true; } if (rtState.joined) { @@ -2547,26 +2583,90 @@ public class ZookeeperDiscoveryImpl { if (rtState.crd) handleProcessedEvents("procEvt"); - else if (updateNodeInfo) { - assert rtState.locNodeZkPath != null; + else + onEventProcessed(rtState, updateNodeInfo, evtProcessed); - if (log.isDebugEnabled()) - log.debug("Update processed events: " + rtState.locNodeInfo.lastProcEvt); + ZkCommunicationErrorProcessFuture commErrFut = commErrProcFut.get(); - try { - zkClient.setData(rtState.locNodeZkPath, marshalZip(rtState.locNodeInfo), -1); + if (commErrFut != null) + commErrFut.onTopologyChange(rtState.top); // This can add new event, notify out of event process loop. + } + + /** + * @param rtState Runtime state. + * @param updateNodeInfo {@code True} if need update processed events without delay. + * @param evtProcessed {@code True} if new event was processed. + * @throws Exception If failed. + */ + private void onEventProcessed(ZkRuntimeState rtState, + boolean updateNodeInfo, + boolean evtProcessed) throws Exception + { + synchronized (stateMux) { + if (updateNodeInfo) { + assert rtState.locNodeZkPath != null; + + if (log.isDebugEnabled()) + log.debug("Update processed events: " + rtState.locNodeInfo.lastProcEvt); + + updateProcessedEvents(rtState); + + if (rtState.procEvtsUpdateTo != null) { + spi.getSpiContext().removeTimeoutObject(rtState.procEvtsUpdateTo); + + rtState.procEvtsUpdateTo = null; + } + } + else if (evtProcessed) { + rtState.locNodeInfo.needUpdate = true; + + if (rtState.procEvtsUpdateTo == null) { + long updateTimeout = IgniteSystemProperties.getLong(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT, + 60_000); + + if (updateTimeout > 0) { + rtState.procEvtsUpdateTo = new UpdateProcessedEventsTimeoutObject(rtState, updateTimeout); + + spi.getSpiContext().addTimeoutObject(rtState.procEvtsUpdateTo); + } + } } - catch (KeeperException.NoNodeException e) { - // Possible if node is forcible failed. + } + } + + /** + * @param rtState Runtime state. + * @param procEvtsUpdateTo Timeout object. + * @throws Exception If failed. + */ + private void updateProcessedEventsOnTimeout(ZkRuntimeState rtState, ZkTimeoutObject procEvtsUpdateTo) + throws Exception + { + synchronized (stateMux) { + if (rtState.procEvtsUpdateTo == procEvtsUpdateTo && rtState.locNodeInfo.needUpdate) { if (log.isDebugEnabled()) - log.debug("Failed to update processed events, no node: " + rtState.locNodeInfo.lastProcEvt); + log.debug("Update processed events on timeout: " + rtState.locNodeInfo.lastProcEvt); + + updateProcessedEvents(rtState); } } + } - ZkCommunicationErrorProcessFuture commErrFut = commErrProcFut.get(); + /** + * @param rtState Runtime state. + * @throws Exception If failed. + */ + private void updateProcessedEvents(ZkRuntimeState rtState) throws Exception { + try { + rtState.zkClient.setData(rtState.locNodeZkPath, marshalZip(rtState.locNodeInfo), -1); - if (commErrFut != null) - commErrFut.onTopologyChange(rtState.top); // This can add new event, notify out of event process loop. + rtState.locNodeInfo.needUpdate = false; + } + catch (KeeperException.NoNodeException e) { + // Possible if node is forcible failed. + if (log.isDebugEnabled()) + log.debug("Failed to update processed events, no node: " + rtState.locNodeInfo.lastProcEvt); + } } /** @@ -2597,14 +2697,14 @@ public class ZookeeperDiscoveryImpl { if (connState == ConnectionState.STOPPED) return; - if (rtState.joinTimeoutObj != null) { - spi.getSpiContext().removeTimeoutObject(rtState.joinTimeoutObj); + if (rtState.joinTo != null) { + spi.getSpiContext().removeTimeoutObject(rtState.joinTo); - rtState.joinTimeoutObj.cancelled = true; - rtState.joinTimeoutObj = null; + rtState.joinTo.cancelled = true; + rtState.joinTo = null; } - spi.getSpiContext().removeTimeoutObject(rtState.joinErrTimeoutObj); + spi.getSpiContext().removeTimeoutObject(rtState.joinErrTo); if (log.isInfoEnabled()) log.info("Local join event data: " + evtData + ']'); @@ -3607,7 +3707,11 @@ public class ZookeeperDiscoveryImpl { return zip(marsh.marshal(obj)); } - static byte[] zip(byte[] bytes) { + /** + * @param bytes Bytes to compress. + * @return Zip-compressed bytes. + */ + private static byte[] zip(byte[] bytes) { Deflater deflater = new Deflater(); deflater.setInput(bytes);
