Repository: ignite Updated Branches: refs/heads/ignite-zk d9456dc12 -> ab47f191b
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ab47f191 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ab47f191 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ab47f191 Branch: refs/heads/ignite-zk Commit: ab47f191b004974eabc0e817d94aaa8e2b8ccb42 Parents: d9456dc Author: sboikov <[email protected]> Authored: Fri Dec 1 13:00:53 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Dec 1 13:00:53 2017 +0300 ---------------------------------------------------------------------- .../zk/internal/ZkDiscoveryEventData.java | 6 +-- .../zk/internal/ZkDiscoveryEventsData.java | 7 +-- .../discovery/zk/internal/ZookeeperClient.java | 18 +++++++ .../zk/internal/ZookeeperDiscoveryImpl.java | 51 ++++++++++++-------- 4 files changed, 52 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ab47f191/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java index f50c504..d8f9a3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java @@ -64,18 +64,14 @@ abstract class ZkDiscoveryEventData implements Serializable { } /** - * @param alives Optional alives nodes for additional filtering. * @param nodes Current nodes in topology. */ - void initRemainingAcks(Collection<ZookeeperClusterNode> nodes, @Nullable TreeMap<Integer, String> alives) { + void initRemainingAcks(Collection<ZookeeperClusterNode> nodes) { assert remainingAcks == null : this; remainingAcks = U.newHashSet(nodes.size()); for (ZookeeperClusterNode node : nodes) { - if (alives != null && !alives.containsKey(node.internalId())) - continue; - if (!node.isLocal() && node.order() <= topVer) { boolean add = remainingAcks.add(node.internalId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/ab47f191/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java index a116a0d..b29d85e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java @@ -58,16 +58,13 @@ class ZkDiscoveryEventsData implements Serializable { /** * @param nodes Current nodes in topology (these nodes should ack that event processed). * @param evt Event. - * @param alives Optional alives nodes for additional filtering. */ - void addEvent(Collection<ZookeeperClusterNode> nodes, - ZkDiscoveryEventData evt, - @Nullable TreeMap<Integer, String> alives) + void addEvent(Collection<ZookeeperClusterNode> nodes, ZkDiscoveryEventData evt) { Object old = evts.put(evt.eventId(), evt); assert old == null : old; - evt.initRemainingAcks(nodes, alives); + evt.initRemainingAcks(nodes); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ab47f191/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java index 3a4a45d..2ccc7ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java @@ -870,6 +870,9 @@ public class ZookeeperClient implements Watcher { /** {@inheritDoc} */ @Override public void processResult(int rc, String path, Object ctx) { + if (closing) + return; + if (rc == KeeperException.Code.NONODE.intValue()) return; @@ -901,6 +904,9 @@ public class ZookeeperClient implements Watcher { } @Override public void processResult(int rc, String path, Object ctx, String name) { + if (closing) + return; + if (rc == KeeperException.Code.NODEEXISTS.intValue()) return; @@ -934,6 +940,9 @@ public class ZookeeperClient implements Watcher { /** {@inheritDoc} */ @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { + if (closing) + return; + if (needRetry(rc)) { U.warn(log, "Failed to execute async operation, connection lost. Will retry after connection restore [path=" + path + ']'); @@ -962,6 +971,9 @@ public class ZookeeperClient implements Watcher { /** {@inheritDoc} */ @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + if (closing) + return; + if (needRetry(rc)) { U.warn(log, "Failed to execute async operation, connection lost. Will retry after connection restore [path=" + path + ']'); @@ -990,6 +1002,9 @@ public class ZookeeperClient implements Watcher { /** {@inheritDoc} */ @Override public void processResult(int rc, String path, Object ctx, Stat stat) { + if (closing) + return; + if (needRetry(rc)) { U.warn(log, "Failed to execute async operation, connection lost. Will retry after connection restore [path=" + path + ']'); @@ -1021,6 +1036,9 @@ public class ZookeeperClient implements Watcher { boolean connLoss = false; synchronized (stateMux) { + if (closing) + return; + if (state == ConnectionState.Disconnected && ZookeeperClient.this.connStartTime == connectStartTime) { http://git-wip-us.apache.org/repos/asf/ignite/blob/ab47f191/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 35fff0a..88905b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -648,9 +648,9 @@ public class ZookeeperDiscoveryImpl { assert state.evtsData != null; for (ZkDiscoveryEventData evtData : state.evtsData.evts.values()) - evtData.initRemainingAcks(state.top.nodesByOrder.values(), null); + evtData.initRemainingAcks(state.top.nodesByOrder.values()); - handleProcessedEvents("crd", null); + handleProcessedEvents("crd"); } else { if (log.isInfoEnabled()) @@ -710,11 +710,16 @@ public class ZookeeperDiscoveryImpl { } } + List<ZookeeperClusterNode> failedNodes = null; + for (Map.Entry<Integer, ZookeeperClusterNode> e : state.top.nodesByInternalId.entrySet()) { if (!alives.containsKey(e.getKey())) { ZookeeperClusterNode failedNode = e.getValue(); - handleProcessedEventsOnNodeFail(failedNode, alives); + if (failedNodes == null) + failedNodes = new ArrayList<>(); + + failedNodes.add(failedNode); generateNodeFail(curTop, failedNode); @@ -724,6 +729,9 @@ public class ZookeeperDiscoveryImpl { if (newEvts) saveAndProcessNewEvents(); + + if (failedNodes != null) + handleProcessedEventsOnNodesFail(failedNodes); } /** @@ -766,7 +774,7 @@ public class ZookeeperDiscoveryImpl { state.evtsData.topVer, failedNode.internalId()); - state.evtsData.addEvent(curTop.values(), evtData, null); + state.evtsData.addEvent(curTop.values(), evtData); if (log.isInfoEnabled()) log.info("Generated NODE_FAILED event [evt=" + evtData + ']'); @@ -838,7 +846,7 @@ public class ZookeeperDiscoveryImpl { evtData.joiningNodeData = joiningNodeData; - state.evtsData.addEvent(dataForJoined.topology(), evtData, null); + state.evtsData.addEvent(dataForJoined.topology(), evtData); evtData.addRemainingAck(joinedNode); // Topology for joined node does not contain joined node. @@ -1013,7 +1021,7 @@ public class ZookeeperDiscoveryImpl { evtData.msg = msg; - state.evtsData.addEvent(state.top.nodesByOrder.values(), evtData, null); + state.evtsData.addEvent(state.top.nodesByOrder.values(), evtData); if (log.isDebugEnabled()) log.debug("Generated CUSTOM event [evt=" + evtData + ", msg=" + msg + ']'); @@ -1180,7 +1188,7 @@ public class ZookeeperDiscoveryImpl { } if (state.crd) - handleProcessedEvents("procEvt", null); + handleProcessedEvents("procEvt"); else if (updateNodeInfo) { assert state.locNodeZkPath != null; @@ -1332,10 +1340,9 @@ public class ZookeeperDiscoveryImpl { /** * @param ctx Context for logging. - * @param alives Optional alives nodes for additional filtering. * @throws Exception If failed. */ - private void handleProcessedEvents(String ctx, @Nullable TreeMap<Integer, String> alives) throws Exception { + private void handleProcessedEvents(String ctx) throws Exception { Iterator<ZkDiscoveryEventData> it = state.evtsData.evts.values().iterator(); List<ZkDiscoveryCustomEventData> newEvts = null; @@ -1428,17 +1435,17 @@ public class ZookeeperDiscoveryImpl { Collection<ZookeeperClusterNode> nodes = state.top.nodesByOrder.values(); for (int i = 0; i < newEvts.size(); i++) - state.evtsData.addEvent(nodes, newEvts.get(i), alives); + state.evtsData.addEvent(nodes, newEvts.get(i)); saveAndProcessNewEvents(); } } /** - * @param failedNode Failed node. + * @param failedNodes Failed nodes. * @throws Exception If failed. */ - private void handleProcessedEventsOnNodeFail(ZookeeperClusterNode failedNode, TreeMap<Integer, String> alives) throws Exception { + private void handleProcessedEventsOnNodesFail(List<ZookeeperClusterNode> failedNodes) throws Exception { boolean processed = false; for (Iterator<Map.Entry<Long, ZkDiscoveryEventData>> it = state.evtsData.evts.entrySet().iterator(); it.hasNext();) { @@ -1446,12 +1453,16 @@ public class ZookeeperDiscoveryImpl { ZkDiscoveryEventData evtData = e.getValue(); - if (evtData.onNodeFail(failedNode)) - processed = true; + for (int i = 0; i < failedNodes.size(); i++) { + ZookeeperClusterNode failedNode = failedNodes.get(i); + + if (evtData.onNodeFail(failedNode)) + processed = true; + } } if (processed) - handleProcessedEvents("fail-" + failedNode.id(), alives); + handleProcessedEvents("fail-" + U.nodeIds(failedNodes)); } /** @@ -1459,14 +1470,14 @@ public class ZookeeperDiscoveryImpl { * @throws Exception If failed. */ private void handleProcessedJoinEvent(ZkDiscoveryNodeJoinEventData evtData) throws Exception { - if (log.isInfoEnabled()) - log.info("All nodes processed node join [evtData=" + evtData + ']'); + if (log.isDebugEnabled()) + log.debug("All nodes processed node join [evtData=" + evtData + ']'); String evtDataPath = zkPaths.joinEventDataPath(evtData.eventId()); String dataForJoinedPath = zkPaths.joinEventDataPathForJoined(evtData.eventId()); - if (log.isInfoEnabled()) - log.info("Delete processed event data [path1=" + evtDataPath + ", path2=" + dataForJoinedPath + ']'); + if (log.isDebugEnabled()) + log.debug("Delete processed event data [path1=" + evtDataPath + ", path2=" + dataForJoinedPath + ']'); state.zkClient.deleteIfExistsAsync(evtDataPath); state.zkClient.deleteIfExistsAsync(dataForJoinedPath); @@ -1821,7 +1832,7 @@ public class ZookeeperDiscoveryImpl { } if (processed) - handleProcessedEvents("ack-" + nodeInternalId, null); + handleProcessedEvents("ack-" + nodeInternalId); } } }
