Repository: ignite Updated Branches: refs/heads/master 223708a89 -> b52edcaae
ignite-3116 Cancel force keys futures on node stop (cherry picked from commit 0428018) Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/db8a9b2b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/db8a9b2b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/db8a9b2b Branch: refs/heads/master Commit: db8a9b2b68ba505376c3be4abe789c84fc8e47cd Parents: ea909bc Author: sboikov <[email protected]> Authored: Mon May 16 09:08:14 2016 +0300 Committer: sboikov <[email protected]> Committed: Mon May 16 09:44:28 2016 +0300 ---------------------------------------------------------------------- .../cache/distributed/dht/GridDhtGetFuture.java | 77 ++++++++++---------- .../distributed/dht/GridDhtGetSingleFuture.java | 4 + .../dht/GridDhtTransactionalCacheAdapter.java | 57 ++++++++++++++- .../dht/atomic/GridDhtAtomicCache.java | 52 ++++++++++++- .../dht/preloader/GridDhtForceKeysFuture.java | 6 +- .../dht/preloader/GridDhtPreloader.java | 28 ++++++- .../communication/tcp/TcpCommunicationSpi.java | 32 ++++---- 7 files changed, 198 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/db8a9b2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index fbfca82..e12e1ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -29,6 +29,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -44,6 +45,7 @@ import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.typedef.C2; +import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -167,9 +169,44 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col * Initializes future. */ void init() { - map(keys); + GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(keys.keySet(), topVer); + + if (fut != null) { + if (!F.isEmpty(fut.invalidPartitions())) { + if (retries == null) + retries = new HashSet<>(); + + retries.addAll(fut.invalidPartitions()); + } + + fut.listen(new CI1<IgniteInternalFuture<Object>>() { + @Override public void apply(IgniteInternalFuture<Object> fut) { + try { + fut.get(); + } + catch (NodeStoppingException e) { + return; + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to request keys from preloader [keys=" + keys + ", err=" + e + ']'); + + onDone(e); + + return; + } + + map0(keys); + + markInitialized(); + } + }); + } + else { + map0(keys); - markInitialized(); + markInitialized(); + } } /** {@inheritDoc} */ @@ -205,42 +242,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col } /** - * @param keys Keys. - */ - private void map(final Map<KeyCacheObject, Boolean> keys) { - GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(keys.keySet(), topVer); - - if (fut != null) { - if (!F.isEmpty(fut.invalidPartitions())) { - if (retries == null) - retries = new HashSet<>(); - - retries.addAll(fut.invalidPartitions()); - } - - add(new GridEmbeddedFuture<>( - new IgniteBiClosure<Object, Exception, Collection<GridCacheEntryInfo>>() { - @Override public Collection<GridCacheEntryInfo> apply(Object o, Exception e) { - if (e != null) { // Check error first. - if (log.isDebugEnabled()) - log.debug("Failed to request keys from preloader [keys=" + keys + ", err=" + e + ']'); - - onDone(e); - } - else - map0(keys); - - // Finish this one. - return Collections.emptyList(); - } - }, - fut)); - } - else - map0(keys); - } - - /** * @param keys Keys to map. */ private void map0(Map<KeyCacheObject, Boolean> keys) { http://git-wip-us.apache.org/repos/asf/ignite/blob/db8a9b2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java index 2de92b1..9394937 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java @@ -27,6 +27,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -220,6 +221,9 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa log.debug("Failed to request keys from preloader " + "[keys=" + key + ", err=" + e + ']'); + if (e instanceof NodeStoppingException) + return; + onDone(e); } else http://git-wip-us.apache.org/repos/asf/ignite/blob/db8a9b2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 7bace73..0a99621 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -29,6 +29,7 @@ import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; @@ -378,11 +379,38 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach IgniteInternalFuture<Object> keyFut = F.isEmpty(req.keys()) ? null : ctx.dht().dhtPreloader().request(req.keys(), req.topologyVersion()); - if (keyFut == null || keyFut.isDone()) + if (keyFut == null || keyFut.isDone()) { + if (keyFut != null) { + try { + keyFut.get(); + } + catch (NodeStoppingException e) { + return; + } + catch (IgniteCheckedException e) { + onForceKeysError(nodeId, req, e); + + return; + } + } + processDhtLockRequest0(nodeId, req); + } else { keyFut.listen(new CI1<IgniteInternalFuture<Object>>() { - @Override public void apply(IgniteInternalFuture<Object> t) { + @Override public void apply(IgniteInternalFuture<Object> fut) { + try { + fut.get(); + } + catch (NodeStoppingException e) { + return; + } + catch (IgniteCheckedException e) { + onForceKeysError(nodeId, req, e); + + return; + } + processDhtLockRequest0(nodeId, req); } }); @@ -392,6 +420,31 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach /** * @param nodeId Node ID. * @param req Request. + * @param e Error. + */ + private void onForceKeysError(UUID nodeId, GridDhtLockRequest req, IgniteCheckedException e) { + GridDhtLockResponse res = new GridDhtLockResponse(ctx.cacheId(), + req.version(), + req.futureId(), + req.miniId(), + e, + ctx.deploymentEnabled()); + + try { + ctx.io().send(nodeId, res, ctx.ioPolicy()); + } + catch (ClusterTopologyCheckedException e0) { + if (log.isDebugEnabled()) + log.debug("Failed to send lock reply to remote node because it left grid: " + nodeId); + } + catch (IgniteCheckedException e0) { + U.error(log, "Failed to send lock reply to node: " + nodeId, e); + } + } + + /** + * @param nodeId Node ID. + * @param req Request. */ protected final void processDhtLockRequest0(UUID nodeId, GridDhtLockRequest req) { assert nodeId != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/db8a9b2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 1f01a76..6a30f7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -39,6 +39,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; @@ -1466,11 +1467,37 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ) { IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion()); - if (forceFut == null || forceFut.isDone()) + if (forceFut == null || forceFut.isDone()) { + try { + if (forceFut != null) + forceFut.get(); + } + catch (NodeStoppingException e) { + return; + } + catch (IgniteCheckedException e) { + onForceKeysError(nodeId, req, completionCb, e); + + return; + } + updateAllAsyncInternal0(nodeId, req, completionCb); + } else { forceFut.listen(new CI1<IgniteInternalFuture<Object>>() { - @Override public void apply(IgniteInternalFuture<Object> t) { + @Override public void apply(IgniteInternalFuture<Object> fut) { + try { + fut.get(); + } + catch (NodeStoppingException e) { + return; + } + catch (IgniteCheckedException e) { + onForceKeysError(nodeId, req, completionCb, e); + + return; + } + updateAllAsyncInternal0(nodeId, req, completionCb); } }); @@ -1478,6 +1505,27 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** + * @param nodeId Node ID. + * @param req Update request. + * @param completionCb Completion callback. + * @param e Error. + */ + private void onForceKeysError(final UUID nodeId, + final GridNearAtomicUpdateRequest req, + final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, + IgniteCheckedException e + ) { + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), + nodeId, + req.futureVersion(), + ctx.deploymentEnabled()); + + res.addFailedKeys(req.keys(), e); + + completionCb.apply(req, res); + } + + /** * Executes local update after preloader fetched values. * * @param nodeId Node ID. http://git-wip-us.apache.org/repos/asf/ignite/blob/db8a9b2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java index 7970a44..4da1f38 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java @@ -246,7 +246,11 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec int curTopVer = topCntr.get(); - preloader.addFuture(this); + if (!preloader.addFuture(this)) { + assert isDone() : this; + + return false; + } trackable = true; http://git-wip-us.apache.org/repos/asf/ignite/blob/db8a9b2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 0de3197..09aec81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -32,6 +32,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -115,6 +116,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** */ private final AtomicInteger partsEvictOwning = new AtomicInteger(); + /** */ + private volatile boolean stopping; + /** Discovery listener. */ private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { @@ -210,6 +214,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (log.isDebugEnabled()) log.debug("DHT rebalancer onKernalStop callback."); + stopping = true; + cctx.events().removeListener(discoLsnr); // Acquire write busy lock. @@ -221,8 +227,19 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (demander != null) demander.stop(); + IgniteCheckedException err = stopError(); + + for (GridDhtForceKeysFuture fut : forceKeyFuts.values()) + fut.onDone(err); + top = null; } + /** + * @return Node stop exception. + */ + private IgniteCheckedException stopError() { + return new NodeStoppingException("Operation has been cancelled (cache or node is stopping)."); + } /** {@inheritDoc} */ @Override public void onInitialExchangeComplete(@Nullable Throwable err) { @@ -711,9 +728,18 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { * Adds future to future map. * * @param fut Future to add. + * @return {@code False} if node cache is stopping and future was completed with error. */ - void addFuture(GridDhtForceKeysFuture<?, ?> fut) { + boolean addFuture(GridDhtForceKeysFuture<?, ?> fut) { forceKeyFuts.put(fut.futureId(), fut); + + if (stopping) { + fut.onDone(stopError()); + + return false; + } + + return true; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/db8a9b2b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 904047e..875131d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1381,25 +1381,29 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override public void dumpStats() { - StringBuilder sb = new StringBuilder("Communication SPI recovery descriptors: ").append(U.nl()); + IgniteLogger log = this.log; - for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) { - GridNioRecoveryDescriptor desc = entry.getValue(); + if (log != null) { + StringBuilder sb = new StringBuilder("Communication SPI recovery descriptors: ").append(U.nl()); - sb.append(" [key=").append(entry.getKey()) - .append(", msgsSent=").append(desc.sent()) - .append(", msgsAckedByRmt=").append(desc.acked()) - .append(", msgsRcvd=").append(desc.received()) - .append(", descIdHash=").append(System.identityHashCode(desc)) - .append(']').append(U.nl()); - } + for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) { + GridNioRecoveryDescriptor desc = entry.getValue(); - U.warn(log, sb.toString()); + sb.append(" [key=").append(entry.getKey()) + .append(", msgsSent=").append(desc.sent()) + .append(", msgsAckedByRmt=").append(desc.acked()) + .append(", msgsRcvd=").append(desc.received()) + .append(", descIdHash=").append(System.identityHashCode(desc)) + .append(']').append(U.nl()); + } - GridNioServer<Message> nioSrvr1 = nioSrvr; + U.warn(log, sb.toString()); + } - if (nioSrvr1 != null) - nioSrvr1.dumpStats(); + GridNioServer<Message> nioSrvr = this.nioSrvr; + + if (nioSrvr != null) + nioSrvr.dumpStats(); } /** {@inheritDoc} */
