zk

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7cbd4ce
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7cbd4ce
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7cbd4ce

Branch: refs/heads/ignite-zk
Commit: b7cbd4cefaa8b60c3626d4a24afbfbcb1f490f39
Parents: fe515ee
Author: sboikov <[email protected]>
Authored: Tue Dec 12 17:52:03 2017 +0300
Committer: sboikov <[email protected]>
Committed: Tue Dec 12 17:52:03 2017 +0300

----------------------------------------------------------------------
 .../ZkCommunicationErrorProcessFuture.java      | 86 +++++++++++++++++---
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 11 ++-
 2 files changed, 79 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b7cbd4ce/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
index 91ecaf7..2ea65e8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
@@ -17,11 +17,14 @@
 
 package org.apache.ignite.spi.discovery.zk.internal;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.spi.IgniteSpiTimeoutObject;
-import org.jboss.netty.util.internal.ConcurrentHashMap;
 
 /**
  *
@@ -31,7 +34,7 @@ class ZkCommunicationErrorProcessFuture extends 
GridFutureAdapter implements Ign
     private final ZookeeperDiscoveryImpl impl;
 
     /** */
-    private final ConcurrentHashMap<UUID, GridFutureAdapter<Boolean>> errNodes 
= new ConcurrentHashMap<>();
+    private final Map<UUID, GridFutureAdapter<Boolean>> errNodes = new 
HashMap<>();
 
     /** */
     private final long endTime;
@@ -41,6 +44,7 @@ class ZkCommunicationErrorProcessFuture extends 
GridFutureAdapter implements Ign
 
     /**
      * @param impl Discovery implementation.
+     * @param timeout Wait timeout before initiating communication errors 
resolve.
      */
     ZkCommunicationErrorProcessFuture(ZookeeperDiscoveryImpl impl, long 
timeout) {
         this.impl = impl;
@@ -50,14 +54,20 @@ class ZkCommunicationErrorProcessFuture extends 
GridFutureAdapter implements Ign
         endTime = System.currentTimeMillis() + timeout;
     }
 
+    /**
+     * @param nodeId Node ID.
+     * @return Future finished when communication error resolve is done.
+     */
     GridFutureAdapter<Boolean> nodeStatusFuture(UUID nodeId) {
-        GridFutureAdapter<Boolean> fut = errNodes.get(nodeId);
+        GridFutureAdapter<Boolean> fut;
 
-        if (fut == null) {
-            GridFutureAdapter<Boolean> old = errNodes.putIfAbsent(nodeId, fut 
= new GridFutureAdapter<>());
+        // TODO ZK: finish race.
 
-            if (old != null)
-                fut = old;
+        synchronized (this) {
+            fut = errNodes.get(nodeId);
+
+            if (fut == null)
+                errNodes.put(nodeId, fut = new GridFutureAdapter<>());
         }
 
         if (impl.node(nodeId) == null)
@@ -66,8 +76,15 @@ class ZkCommunicationErrorProcessFuture extends 
GridFutureAdapter implements Ign
         return fut;
     }
 
+    /**
+     * @param nodeId Node ID.
+     */
     void onNodeFailed(UUID nodeId) {
-        GridFutureAdapter<Boolean> fut = errNodes.get(nodeId);
+        GridFutureAdapter<Boolean> fut;
+
+        synchronized (this) {
+            fut = errNodes.get(nodeId);
+        }
 
         if (fut != null)
             fut.onDone(false);
@@ -75,20 +92,65 @@ class ZkCommunicationErrorProcessFuture extends 
GridFutureAdapter implements Ign
 
     /** {@inheritDoc} */
     @Override public void run() {
-        // TODO ZK
+        if (checkNotDoneOnTimeout()) {
+            try {
+                impl.sendCustomMessage(new 
ZkInternalCommunicationErrorMessage());
+            }
+            catch (Exception e) {
+                onError(e);
+            }
+        }
     }
 
     /** {@inheritDoc} */
     @Override public IgniteUuid id() {
-        return null;
+        return id;
     }
 
+    /** {@inheritDoc} */
     @Override public long endTime() {
-        return 0;
+        return endTime;
     }
 
     /** {@inheritDoc} */
     @Override public void onTimeout() {
-        // TODO ZK
+        if (isDone())
+            return;
+
+        if (checkNotDoneOnTimeout())
+            impl.runInWorkerThread(this);
+    }
+
+    /**
+     * @param e Error.
+     */
+    private void onError(Exception e) {
+        List<GridFutureAdapter<Boolean>> futs;
+
+        synchronized (this) {
+            futs = new ArrayList<>(errNodes.values());
+        }
+
+        for (GridFutureAdapter<Boolean> fut : futs)
+            fut.onDone(e);
+
+        onDone(e);
+    }
+
+    /**
+     * @return {@code True} if future already finished.
+     */
+    private boolean checkNotDoneOnTimeout() {
+        // TODO ZK check state.
+        synchronized (this) {
+            for (GridFutureAdapter<Boolean> fut : errNodes.values()) {
+                if (!fut.isDone())
+                    return false;
+            }
+        }
+
+        onDone(null);
+
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7cbd4ce/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 783595f..d21c18b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -225,9 +225,6 @@ public class ZookeeperDiscoveryImpl {
      * @param err Connect error.
      */
     public void onCommunicationConnectionError(ClusterNode node0, Exception 
err) {
-        if (true)
-            return;
-
         ZookeeperClusterNode node = node(node0.id());
 
         if (node == null)
@@ -236,12 +233,14 @@ public class ZookeeperDiscoveryImpl {
         ZkCommunicationErrorProcessFuture fut = commErrProcFut.get();
 
         if (fut == null || fut.isDone()) {
-            ZkCommunicationErrorProcessFuture newFut = new 
ZkCommunicationErrorProcessFuture(this, node.sessionTimeout());
+            ZkCommunicationErrorProcessFuture newFut = new 
ZkCommunicationErrorProcessFuture(
+                this,
+                node.sessionTimeout() + 1000);
 
             if (commErrProcFut.compareAndSet(fut, newFut)) {
                 fut = newFut;
 
-                sendCustomMessage(new ZkInternalCommunicationErrorMessage());
+                spi.getSpiContext().addTimeoutObject(fut);
             }
             else
                 fut = commErrProcFut.get();
@@ -2255,7 +2254,7 @@ public class ZookeeperDiscoveryImpl {
     /**
      * @param c Closure to run.
      */
-    private void runInWorkerThread(Runnable c) {
+    void runInWorkerThread(Runnable c) {
         IgniteThreadPoolExecutor pool;
 
         synchronized (stateMux) {

Reply via email to