Repository: ignite Updated Branches: refs/heads/ignite-zk c1c644b9c -> 2283df29b
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2283df29 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2283df29 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2283df29 Branch: refs/heads/ignite-zk Commit: 2283df29bdd5c5ff5ec4eb4c9cdcd0f21bc2d489 Parents: c1c644b Author: sboikov <[email protected]> Authored: Thu Dec 21 15:01:35 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Dec 21 15:13:06 2017 +0300 ---------------------------------------------------------------------- .../datastreamer/DataStreamerImpl.java | 31 +++++++++++++------- .../zk/internal/ZookeeperDiscoveryImpl.java | 17 ++++++++--- ...niteClientReconnectFailoverAbstractTest.java | 7 +++-- 3 files changed, 38 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2283df29/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 4e3f233..e97ac79 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -45,6 +45,7 @@ import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteDataStreamerTimeoutException; import org.apache.ignite.IgniteException; @@ -98,6 +99,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.GPC; @@ -1068,6 +1070,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed return; while (true) { + if (disconnectErr != null) + throw disconnectErr; + Queue<IgniteInternalFuture<?>> q = null; for (Buffer buf : bufMappings.values()) { @@ -1114,8 +1119,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed throw new IgniteDataStreamerTimeoutException("Data streamer exceeded timeout on flush.", e); } catch (IgniteCheckedException e) { - //if (log.isDebugEnabled()) - log.error("Failed to flush buffer: " + e, e); + if (log.isDebugEnabled()) + log.debug("Failed to flush buffer: " + e); err = true; } @@ -1808,15 +1813,19 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed catch (IgniteCheckedException e) { GridFutureAdapter<Object> fut0 = ((GridFutureAdapter<Object>)fut); - try { - if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id())) - fut0.onDone(e); - else - fut0.onDone(new ClusterTopologyCheckedException("Failed to send request (node has left): " - + node.id())); - } - catch (IgniteClientDisconnectedCheckedException e0) { - fut0.onDone(e0); + if (X.hasCause(e, IgniteClientDisconnectedCheckedException.class, IgniteClientDisconnectedException.class)) + fut0.onDone(e); + else { + try { + if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id())) + fut0.onDone(e); + else + fut0.onDone(new ClusterTopologyCheckedException("Failed to send request (node has left): " + + node.id())); + } + catch (IgniteClientDisconnectedCheckedException e0) { + fut0.onDone(e0); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2283df29/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 6c9a216..d66ea2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -339,6 +339,8 @@ public class ZookeeperDiscoveryImpl { * @return Ping result. */ public boolean pingNode(UUID nodeId) { + checkState(); + ZkRuntimeState rtState = this.rtState; ZookeeperClusterNode node = rtState.top.nodesById.get(nodeId); @@ -2298,7 +2300,14 @@ public class ZookeeperDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Update processed events: " + rtState.locNodeInfo.lastProcEvt); - zkClient.setData(rtState.locNodeZkPath, marshalZip(rtState.locNodeInfo), -1); + try { + zkClient.setData(rtState.locNodeZkPath, marshalZip(rtState.locNodeInfo), -1); + } + catch (KeeperException.NoNodeException e) { + // Possible if node is forcible failed. + if (log.isDebugEnabled()) + log.debug("Failed to update processed events, no node: " + rtState.locNodeInfo.lastProcEvt); + } } ZkCommunicationErrorProcessFuture commErrFut = commErrProcFut.get(); @@ -2308,9 +2317,9 @@ public class ZookeeperDiscoveryImpl { } /** - * @param node - * @param evtData - * @throws Exception + * @param node Node. + * @param evtData Node join event data. + * @throws Exception If failed. */ private void readAndInitSecuritySubject(ZookeeperClusterNode node, ZkDiscoveryNodeJoinEventData evtData) throws Exception { if (evtData.secSubjPartCnt > 0) { http://git-wip-us.apache.org/repos/asf/ignite/blob/2283df29/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java index fa8670c..37292ff 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java @@ -210,14 +210,17 @@ public abstract class IgniteClientReconnectFailoverAbstractTest extends IgniteCl } if (err != null) { - log.error("Test error:" + err); + log.error("Test error: " + err); U.dumpThreads(log); CyclicBarrier barrier0 = barrier; - if (barrier0 != null) + if (barrier0 != null) { + barrier = null; + barrier0.reset(); + } stop.set(true);
