Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 646be0a51 -> f6aae4d96


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f6aae4d9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f6aae4d9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f6aae4d9

Branch: refs/heads/ignite-zk
Commit: f6aae4d96ea418291d264744399800baa9109f68
Parents: 646be0a
Author: sboikov <[email protected]>
Authored: Thu Dec 21 11:03:39 2017 +0300
Committer: sboikov <[email protected]>
Committed: Thu Dec 21 12:09:01 2017 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtCacheAdapter.java    |  4 +
 .../datastreamer/DataStreamerImpl.java          |  3 +
 .../zk/internal/ZkAbstractCallabck.java         | 19 ++++-
 .../ZkCommunicationErrorProcessFuture.java      |  4 +
 .../discovery/zk/internal/ZkRuntimeState.java   | 10 ++-
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 86 +++++++++++++-------
 ...niteClientReconnectFailoverAbstractTest.java |  2 +-
 7 files changed, 92 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f6aae4d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 28f9c76..ee0cb01 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -966,6 +966,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
                 try {
                     ctx.io().send(nodeId, res, ctx.ioPolicy());
                 }
+                catch (ClusterTopologyCheckedException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to send get response to node, node 
failed: " + nodeId);
+                }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to send get response to node (is node 
still alive?) [nodeId=" + nodeId +
                         ",req=" + req + ", res=" + res + ']', e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f6aae4d9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 12eb2dc..da07e91 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1146,6 +1146,9 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
 
             if (doneCnt == activeFuts0.size())
                 return;
+
+            if (disconnectErr != null)
+                throw disconnectErr;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f6aae4d9/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java
index d2efb9f..b80a9dd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java
@@ -47,7 +47,24 @@ abstract class ZkAbstractCallabck {
      * @return {@code True} if is able to start processing.
      */
     final boolean onProcessStart() {
-        return !rtState.closing && busyLock.enterBusy();
+        boolean start = rtState.errForClose == null && busyLock.enterBusy();
+
+        if (!start) {
+            assert rtState.errForClose != null;
+
+            onStartFailed();
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     *
+     */
+    void onStartFailed() {
+        // No-op.
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/f6aae4d9/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
index e64c801..accda6e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
@@ -175,6 +175,10 @@ class ZkCommunicationErrorProcessFuture extends 
GridFutureAdapter<Void> implemen
                             impl.localNode().order(),
                             impl.marshalZip(state));
                     }
+
+                    @Override void onStartFailed() {
+                        onError(rtState.errForClose);
+                    }
                 });
 
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f6aae4d9/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
index dc7b1bb..9c000b3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
@@ -34,7 +34,7 @@ class ZkRuntimeState {
     ZkAliveNodeDataWatcher aliveNodeDataWatcher;
 
     /** */
-    volatile boolean closing;
+    volatile Exception errForClose;
 
     /** */
     final boolean prevJoined;
@@ -95,10 +95,12 @@ class ZkRuntimeState {
     }
 
     /**
-     *
+     * @param err Error.
      */
-    void onCloseStart() {
-        closing = true;
+    void onCloseStart(Exception err) {
+        assert err != null;
+
+        errForClose = err;
 
         ZookeeperClient zkClient = this.zkClient;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f6aae4d9/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index f79e3f5..f8fe421 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -398,14 +398,15 @@ public class ZookeeperDiscoveryImpl {
         assert clientReconnectEnabled;
 
         synchronized (stateMux) {
-            if (connState == ConnectionState.STARTED)
+            if (connState == ConnectionState.STARTED) {
                 connState = ConnectionState.DISCONNECTED;
+
+                rtState.onCloseStart(disconnectError());
+            }
             else
                 return;
         }
 
-        rtState.onCloseStart();
-
         busyLock.block();
 
         busyLock.unblock();
@@ -454,12 +455,14 @@ public class ZookeeperDiscoveryImpl {
      * @param e Error.
      */
     private void onSegmented(Exception e) {
+        rtState.errForClose = e;
+
         if (rtState.joined) {
             synchronized (stateMux) {
                 connState = ConnectionState.STOPPED;
             }
 
-            rtState.zkClient.zk().sync(zkPaths.clusterDir, new 
SegmentedWatcher(), null);
+            notifySegmented();
         }
         else
             joinFut.onDone(e);
@@ -468,15 +471,6 @@ public class ZookeeperDiscoveryImpl {
     /**
      *
      */
-    class SegmentedWatcher implements AsyncCallback.VoidCallback {
-        @Override public void processResult(int rc, String path, Object ctx) {
-            notifySegmented();
-        }
-    }
-
-    /**
-     *
-     */
     private void notifySegmented() {
         assert rtState.evtsData != null;
 
@@ -514,6 +508,13 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
+     * @return Exception.
+     */
+    static IgniteClientDisconnectedCheckedException disconnectError() {
+        return new IgniteClientDisconnectedCheckedException(null, "Client node 
disconnected.");
+    }
+
+    /**
      * @param nodeId Node ID.
      * @return {@code True} if node joined or joining topology.
      */
@@ -1096,7 +1097,7 @@ public class ZookeeperDiscoveryImpl {
 
         /** {@inheritDoc} */
         @Override public void process0(WatchedEvent evt) {
-            if (rtState.closing || rtState.joined)
+            if (rtState.errForClose != null || rtState.joined)
                 return;
 
             if (evt.getType() == Event.EventType.NodeDataChanged)
@@ -1258,7 +1259,7 @@ public class ZookeeperDiscoveryImpl {
                 catch (Exception e) {
                     U.warn(log, "Local node authentication failed: " + e, e);
 
-                    rtState.onCloseStart();
+                    rtState.onCloseStart(e);
 
                     joinFut.onDone(e);
 
@@ -1809,8 +1810,10 @@ public class ZookeeperDiscoveryImpl {
 
         final List<ClusterNode> topSnapshot = 
Collections.singletonList((ClusterNode)locNode);
 
-        if (connState == ConnectionState.DISCONNECTED)
-            connState = ConnectionState.STARTED;
+        synchronized (stateMux) {
+            if (connState == ConnectionState.DISCONNECTED)
+                connState = ConnectionState.STARTED;
+        }
 
         lsnr.onDiscovery(EventType.EVT_NODE_JOINED,
             1L,
@@ -2294,8 +2297,10 @@ public class ZookeeperDiscoveryImpl {
 
         final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
 
-        if (connState == ConnectionState.DISCONNECTED)
-            connState = ConnectionState.STARTED;
+        synchronized (stateMux) {
+            if (connState == ConnectionState.DISCONNECTED)
+                connState = ConnectionState.STARTED;
+        }
 
         lsnr.onDiscovery(evtData.eventType(),
             evtData.topologyVersion(),
@@ -2723,7 +2728,7 @@ public class ZookeeperDiscoveryImpl {
 
         client.deleteIfExistsAsync(zkPaths.aliveNodesDir);
 
-        rtState.onCloseStart();
+        rtState.onCloseStart(new IgniteCheckedException("Simulate node failure 
error."));
 
         rtState.zkClient.close();
     }
@@ -2815,8 +2820,6 @@ public class ZookeeperDiscoveryImpl {
     private Exception localNodeFail(String msg, boolean clientReconnect) {
         U.warn(log, msg);
 
-        rtState.onCloseStart();
-
         if (clientReconnect && clientReconnectEnabled) {
             assert locNode.isClient() : locNode;
 
@@ -2827,6 +2830,8 @@ public class ZookeeperDiscoveryImpl {
                     reconnect = true;
 
                     connState = ConnectionState.DISCONNECTED;
+
+                    rtState.onCloseStart(disconnectError());
                 }
             }
 
@@ -2841,8 +2846,11 @@ public class ZookeeperDiscoveryImpl {
                 runInWorkerThread(new ReconnectClosure(newId));
             }
         }
-        else
+        else {
+            rtState.errForClose = new IgniteCheckedException(msg);
+
             notifySegmented();
+        }
 
         // Stop any further processing.
         return new ZookeeperClientFailedException(msg);
@@ -3102,11 +3110,13 @@ public class ZookeeperDiscoveryImpl {
         if (!stop.compareAndSet(false, true))
             return;
 
+        IgniteCheckedException err = new IgniteCheckedException("Node 
stopped.");
+
         synchronized (stateMux) {
             connState = ConnectionState.STOPPED;
-        }
 
-        rtState.onCloseStart();
+            rtState.onCloseStart(err);
+        }
 
         busyLock.block();
 
@@ -3119,7 +3129,7 @@ public class ZookeeperDiscoveryImpl {
         if (zkClient != null)
             zkClient.close();
 
-        finishFutures(new IgniteCheckedException("Node stopped."));
+        finishFutures(err);
 
         IgniteUtils.shutdownNow(ZookeeperDiscoveryImpl.class, utilityPool, 
log);
     }
@@ -3217,9 +3227,7 @@ public class ZookeeperDiscoveryImpl {
 
         /** {@inheritDoc} */
         @Override public void run() {
-            finishFutures(new IgniteClientDisconnectedCheckedException(null, 
"Client node disconnected."));
-
-            rtState.closing = true;
+            finishFutures(disconnectError());
 
             busyLock.block();
 
@@ -3240,8 +3248,11 @@ public class ZookeeperDiscoveryImpl {
         @Override public void run() {
             if (clientReconnectEnabled) {
                 synchronized (stateMux) {
-                    if (connState == ConnectionState.STARTED)
+                    if (connState == ConnectionState.STARTED) {
                         connState = ConnectionState.DISCONNECTED;
+
+                        rtState.onCloseStart(disconnectError());
+                    }
                     else
                         return;
                 }
@@ -3533,6 +3544,10 @@ public class ZookeeperDiscoveryImpl {
                             }
                         }
                     }
+
+                    @Override void onStartFailed() {
+                        onDone(rtState.errForClose);
+                    }
                 });
             }
         }
@@ -3552,6 +3567,17 @@ public class ZookeeperDiscoveryImpl {
          * @return {@code False} if future was completed.
          */
         boolean checkNodeAndState() {
+            if (isDone())
+                return false;
+
+            Exception err = rtState.errForClose;
+
+            if (err != null) {
+                onDone(err);
+
+                return false;
+            }
+
             ConnectionState connState = ZookeeperDiscoveryImpl.this.connState;
 
             if (connState == ConnectionState.DISCONNECTED) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f6aae4d9/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
index 676a641..fa8670c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
@@ -210,7 +210,7 @@ public abstract class 
IgniteClientReconnectFailoverAbstractTest extends IgniteCl
             }
 
             if (err != null) {
-                log.error(err);
+                log.error("Test error:" + err);
 
                 U.dumpThreads(log);
 

Reply via email to