zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7cbd4ce Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7cbd4ce Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7cbd4ce Branch: refs/heads/ignite-zk Commit: b7cbd4cefaa8b60c3626d4a24afbfbcb1f490f39 Parents: fe515ee Author: sboikov <[email protected]> Authored: Tue Dec 12 17:52:03 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Dec 12 17:52:03 2017 +0300 ---------------------------------------------------------------------- .../ZkCommunicationErrorProcessFuture.java | 86 +++++++++++++++++--- .../zk/internal/ZookeeperDiscoveryImpl.java | 11 ++- 2 files changed, 79 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b7cbd4ce/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java index 91ecaf7..2ea65e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java @@ -17,11 +17,14 @@ package org.apache.ignite.spi.discovery.zk.internal; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.UUID; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.spi.IgniteSpiTimeoutObject; -import org.jboss.netty.util.internal.ConcurrentHashMap; /** * @@ -31,7 +34,7 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter implements Ign private final ZookeeperDiscoveryImpl impl; /** */ - private final ConcurrentHashMap<UUID, GridFutureAdapter<Boolean>> errNodes = new ConcurrentHashMap<>(); + private final Map<UUID, GridFutureAdapter<Boolean>> errNodes = new HashMap<>(); /** */ private final long endTime; @@ -41,6 +44,7 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter implements Ign /** * @param impl Discovery implementation. + * @param timeout Wait timeout before initiating communication errors resolve. */ ZkCommunicationErrorProcessFuture(ZookeeperDiscoveryImpl impl, long timeout) { this.impl = impl; @@ -50,14 +54,20 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter implements Ign endTime = System.currentTimeMillis() + timeout; } + /** + * @param nodeId Node ID. + * @return Future finished when communication error resolve is done. + */ GridFutureAdapter<Boolean> nodeStatusFuture(UUID nodeId) { - GridFutureAdapter<Boolean> fut = errNodes.get(nodeId); + GridFutureAdapter<Boolean> fut; - if (fut == null) { - GridFutureAdapter<Boolean> old = errNodes.putIfAbsent(nodeId, fut = new GridFutureAdapter<>()); + // TODO ZK: finish race. - if (old != null) - fut = old; + synchronized (this) { + fut = errNodes.get(nodeId); + + if (fut == null) + errNodes.put(nodeId, fut = new GridFutureAdapter<>()); } if (impl.node(nodeId) == null) @@ -66,8 +76,15 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter implements Ign return fut; } + /** + * @param nodeId Node ID. + */ void onNodeFailed(UUID nodeId) { - GridFutureAdapter<Boolean> fut = errNodes.get(nodeId); + GridFutureAdapter<Boolean> fut; + + synchronized (this) { + fut = errNodes.get(nodeId); + } if (fut != null) fut.onDone(false); @@ -75,20 +92,65 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter implements Ign /** {@inheritDoc} */ @Override public void run() { - // TODO ZK + if (checkNotDoneOnTimeout()) { + try { + impl.sendCustomMessage(new ZkInternalCommunicationErrorMessage()); + } + catch (Exception e) { + onError(e); + } + } } /** {@inheritDoc} */ @Override public IgniteUuid id() { - return null; + return id; } + /** {@inheritDoc} */ @Override public long endTime() { - return 0; + return endTime; } /** {@inheritDoc} */ @Override public void onTimeout() { - // TODO ZK + if (isDone()) + return; + + if (checkNotDoneOnTimeout()) + impl.runInWorkerThread(this); + } + + /** + * @param e Error. + */ + private void onError(Exception e) { + List<GridFutureAdapter<Boolean>> futs; + + synchronized (this) { + futs = new ArrayList<>(errNodes.values()); + } + + for (GridFutureAdapter<Boolean> fut : futs) + fut.onDone(e); + + onDone(e); + } + + /** + * @return {@code True} if future already finished. + */ + private boolean checkNotDoneOnTimeout() { + // TODO ZK check state. + synchronized (this) { + for (GridFutureAdapter<Boolean> fut : errNodes.values()) { + if (!fut.isDone()) + return false; + } + } + + onDone(null); + + return true; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b7cbd4ce/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 783595f..d21c18b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -225,9 +225,6 @@ public class ZookeeperDiscoveryImpl { * @param err Connect error. */ public void onCommunicationConnectionError(ClusterNode node0, Exception err) { - if (true) - return; - ZookeeperClusterNode node = node(node0.id()); if (node == null) @@ -236,12 +233,14 @@ public class ZookeeperDiscoveryImpl { ZkCommunicationErrorProcessFuture fut = commErrProcFut.get(); if (fut == null || fut.isDone()) { - ZkCommunicationErrorProcessFuture newFut = new ZkCommunicationErrorProcessFuture(this, node.sessionTimeout()); + ZkCommunicationErrorProcessFuture newFut = new ZkCommunicationErrorProcessFuture( + this, + node.sessionTimeout() + 1000); if (commErrProcFut.compareAndSet(fut, newFut)) { fut = newFut; - sendCustomMessage(new ZkInternalCommunicationErrorMessage()); + spi.getSpiContext().addTimeoutObject(fut); } else fut = commErrProcFut.get(); @@ -2255,7 +2254,7 @@ public class ZookeeperDiscoveryImpl { /** * @param c Closure to run. */ - private void runInWorkerThread(Runnable c) { + void runInWorkerThread(Runnable c) { IgniteThreadPoolExecutor pool; synchronized (stateMux) {
