ignite-1452 Cancel cache operations on node stop
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/585761f2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/585761f2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/585761f2 Branch: refs/heads/ignite-1093-2 Commit: 585761f28e8b70487eaf2198d6ea39f7232b088d Parents: b8c0b30 Author: sboikov <[email protected]> Authored: Thu Sep 17 16:26:02 2015 +0300 Committer: sboikov <[email protected]> Committed: Thu Sep 17 16:26:02 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 7 --- .../processors/cache/GridCacheContext.java | 6 +-- .../cache/GridCacheEvictionManager.java | 6 +-- .../cache/GridCacheEvictionResponse.java | 2 +- .../processors/cache/GridCacheIoManager.java | 47 +++++++++++++------- .../processors/cache/GridCacheMessage.java | 7 +++ .../processors/cache/GridCacheMvccManager.java | 34 +++++++++++--- .../GridCachePartitionExchangeManager.java | 41 +++++++++++++---- .../processors/cache/GridCacheProcessor.java | 28 ++++++++---- .../GridDistributedLockResponse.java | 6 +-- .../GridDistributedTxPrepareResponse.java | 6 +-- .../distributed/dht/GridDhtTopologyFuture.java | 6 ++- .../dht/GridDhtTransactionalCacheAdapter.java | 2 +- .../dht/atomic/GridDhtAtomicUpdateResponse.java | 12 +++-- .../dht/atomic/GridNearAtomicUpdateFuture.java | 16 ++++--- .../dht/atomic/GridNearAtomicUpdateRequest.java | 2 + .../atomic/GridNearAtomicUpdateResponse.java | 11 ++--- .../colocated/GridDhtColocatedLockFuture.java | 44 ++++++++++++++---- .../dht/preloader/GridDhtForceKeysFuture.java | 2 +- .../dht/preloader/GridDhtForceKeysResponse.java | 6 +-- .../GridDhtPartitionsExchangeFuture.java | 19 ++++++-- .../distributed/near/GridNearGetResponse.java | 6 +-- .../distributed/near/GridNearLockFuture.java | 26 ++++++++--- .../near/GridNearOptimisticTxPrepareFuture.java | 20 +++++++-- .../near/GridNearTxFinishResponse.java | 6 +-- .../cache/query/GridCacheQueryResponse.java | 6 +-- .../continuous/CacheContinuousQueryHandler.java | 12 +++-- .../transactions/IgniteTxLocalAdapter.java | 4 +- .../ignite/internal/util/GridSpinBusyLock.java | 10 +++++ .../IgniteCacheEntryProcessorNodeJoinTest.java | 24 +++++++--- .../loadtests/hashmap/GridCacheTestContext.java | 4 +- .../IgniteCacheQueryNodeRestartSelfTest2.java | 2 - 32 files changed, 292 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index daf7d23..82db059 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -1806,8 +1806,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { notifyLifecycleBeansEx(LifecycleEventType.BEFORE_NODE_STOP); } - GridCacheProcessor cacheProcessor = ctx.cache(); - List<GridComponent> comps = ctx.components(); ctx.marshallerContext().onKernalStop(); @@ -1856,11 +1854,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { // Note that interrupted flag is cleared. interrupted = true; } - finally { - // Cleanup even on successful acquire. - if (cacheProcessor != null) - cacheProcessor.cancelUserOperations(); - } } if (interrupted) http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 86ba3e6..5385dec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -283,12 +283,12 @@ public class GridCacheContext<K, V> implements Externalizable { GridCacheEvictionManager evictMgr, GridCacheQueryManager<K, V> qryMgr, CacheContinuousQueryManager contQryMgr, - GridCacheAffinityManager affMgr, CacheDataStructuresManager dataStructuresMgr, GridCacheTtlManager ttlMgr, GridCacheDrManager drMgr, CacheConflictResolutionManager<K, V> rslvrMgr, - CachePluginManager pluginMgr + CachePluginManager pluginMgr, + GridCacheAffinityManager affMgr ) { assert ctx != null; assert sharedCtx != null; @@ -323,12 +323,12 @@ public class GridCacheContext<K, V> implements Externalizable { this.evictMgr = add(evictMgr); this.qryMgr = add(qryMgr); this.contQryMgr = add(contQryMgr); - this.affMgr = add(affMgr); this.dataStructuresMgr = add(dataStructuresMgr); this.ttlMgr = add(ttlMgr); this.drMgr = add(drMgr); this.rslvrMgr = add(rslvrMgr); this.pluginMgr = add(pluginMgr); + this.affMgr = add(affMgr); log = ctx.log(getClass()); http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java index 3e0e2f9..1c34c76 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java @@ -1943,7 +1943,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { lock.readLock().unlock(); } - if (res.error()) + if (res.evictError()) // Complete future, since there was a class loading error on at least one node. complete(false); else @@ -1985,14 +1985,14 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { boolean err = F.forAny(resMap.values(), new P1<GridCacheEvictionResponse>() { @Override public boolean apply(GridCacheEvictionResponse res) { - return res.error(); + return res.evictError(); } }); if (err) { Collection<UUID> ids = F.view(resMap.keySet(), new P1<UUID>() { @Override public boolean apply(UUID e) { - return resMap.get(e).error(); + return resMap.get(e).evictError(); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java index 4d40c8d..aa3911b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java @@ -116,7 +116,7 @@ public class GridCacheEvictionResponse extends GridCacheMessage { /** * @return {@code True} if request processing has finished with error. */ - boolean error() { + boolean evictError() { return err; } http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index b55c84d..421ec82 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -182,8 +182,15 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { c = clsHandlers.get(new ListenerKey(cacheMsg.cacheId(), cacheMsg.getClass())); if (c == null) { - U.warn(log, "Received message without registered handler (will ignore) [msg=" + cacheMsg + - ", nodeId=" + nodeId + ']'); + if (cctx.kernalContext().isStopping()) { + if (log.isDebugEnabled()) + log.debug("Received message without registered handler (will ignore) [msg=" + cacheMsg + + ", nodeId=" + nodeId + ']'); + } + else { + U.warn(log, "Received message without registered handler (will ignore) [msg=" + cacheMsg + + ", nodeId=" + nodeId + ']'); + } return; } @@ -596,9 +603,13 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { * * @param msg Message to send. * @param destNodeId Destination node ID. + * @return {@code True} if should send message. * @throws IgniteCheckedException If failed. */ - private void onSend(GridCacheMessage msg, @Nullable UUID destNodeId) throws IgniteCheckedException { + private boolean onSend(GridCacheMessage msg, @Nullable UUID destNodeId) throws IgniteCheckedException { + if (msg.error() != null && cctx.kernalContext().isStopping()) + return false; + if (msg.messageId() < 0) // Generate and set message ID. msg.messageId(idGen.incrementAndGet()); @@ -609,6 +620,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (depEnabled && msg instanceof GridCacheDeployable) cctx.deploy().prepare((GridCacheDeployable)msg); } + + return true; } /** @@ -624,7 +637,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { public void send(ClusterNode node, GridCacheMessage msg, byte plc) throws IgniteCheckedException { assert !node.isLocal(); - onSend(msg, node.id()); + if (!onSend(msg, node.id())) + return; if (log.isDebugEnabled()) log.debug("Sending cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']'); @@ -663,12 +677,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { * @param msg Message to send. * @param plc IO policy. * @param fallback Callback for failed nodes. - * @return {@code True} if nodes are empty or message was sent, {@code false} if - * all nodes have left topology while sending this message. * @throws IgniteCheckedException If send failed. */ @SuppressWarnings({"BusyWait", "unchecked"}) - public boolean safeSend(Collection<? extends ClusterNode> nodes, GridCacheMessage msg, byte plc, + public void safeSend(Collection<? extends ClusterNode> nodes, GridCacheMessage msg, byte plc, @Nullable IgnitePredicate<ClusterNode> fallback) throws IgniteCheckedException { assert nodes != null; assert msg != null; @@ -677,10 +689,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (log.isDebugEnabled()) log.debug("Message will not be sent as collection of nodes is empty: " + msg); - return true; + return; } - onSend(msg, null); + if (!onSend(msg, null)) + return; if (log.isDebugEnabled()) log.debug("Sending cache message [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ']'); @@ -709,7 +722,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (fallback != null && !fallback.apply(n)) // If fallback signalled to stop. - return false; + return; added = true; } @@ -721,7 +734,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { log.debug("Message will not be sent because all nodes left topology [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ']'); - return false; + return; } } @@ -737,7 +750,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (fallback != null && !fallback.apply(n)) // If fallback signalled to stop. - return false; + return; added = true; } @@ -757,7 +770,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { log.debug("Message will not be sent because all nodes left topology [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ']'); - return false; + return; } if (log.isDebugEnabled()) @@ -768,8 +781,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (log.isDebugEnabled()) log.debug("Sent cache message [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ']'); - - return true; } /** @@ -800,7 +811,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { */ public void sendOrderedMessage(ClusterNode node, Object topic, GridCacheMessage msg, byte plc, long timeout) throws IgniteCheckedException { - onSend(msg, node.id()); + if (!onSend(msg, node.id())) + return; int cnt = 0; @@ -854,7 +866,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { assert node != null; assert msg != null; - onSend(msg, null); + if (!onSend(msg, null)) + return; try { cctx.gridIO().send(node, TOPIC_CACHE, msg, plc); http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index 4e737a0..55688e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -77,6 +77,13 @@ public abstract class GridCacheMessage implements Message { protected int cacheId; /** + * @return Error, if any. + */ + @Nullable public Throwable error() { + return null; + } + + /** * Gets next ID for indexed message ID. * * @return Message ID. http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index 555bbda..e2d0302 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -120,6 +120,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { @SuppressWarnings( {"FieldAccessedSynchronizedAndUnsynchronized"}) private IgniteLogger exchLog; + /** */ + private volatile boolean stopping; + /** Lock callback. */ @GridToStringExclude private final GridCacheMvccCallback cb = new GridCacheMvccCallback() { @@ -325,8 +328,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { /** * Cancels all client futures. */ - public void cancelClientFutures() { - cancelClientFutures(new IgniteCheckedException("Operation has been cancelled (node is stopping).")); + public void onStop() { + stopping = true; + + cancelClientFutures(stopError()); } /** {@inheritDoc} */ @@ -362,6 +367,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** + * @return Node stop exception. + */ + private IgniteCheckedException stopError() { + return new IgniteCheckedException("Operation has been cancelled (node is stopping)."); + } + + /** * @param from From version. * @return To version. */ @@ -385,8 +397,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']'; - if (cctx.kernalContext().clientDisconnected()) - ((GridFutureAdapter)fut).onDone(disconnectedError(null)); + onFutureAdded(fut); } /** @@ -507,17 +518,26 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { fut.onNodeLeft(n.id()); } - if (cctx.kernalContext().clientDisconnected()) - ((GridFutureAdapter)fut).onDone(disconnectedError(null)); - // Just in case if future was completed before it was added. if (fut.isDone()) removeFuture(fut); + else + onFutureAdded(fut); return true; } /** + * @param fut Future. + */ + private void onFutureAdded(IgniteInternalFuture<?> fut) { + if (stopping) + ((GridFutureAdapter)fut).onDone(stopError()); + else if (cctx.kernalContext().clientDisconnected()) + ((GridFutureAdapter)fut).onDone(disconnectedError(null)); + } + + /** * @param fut Future to remove. * @return {@code True} if removed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 20340d1..34c571c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -147,6 +147,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana */ private ExchangeFutureSet exchFuts = new ExchangeFutureSet(); + /** */ + private volatile IgniteCheckedException stopErr; + /** Discovery listener. */ private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { @@ -381,7 +384,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class); cctx.io().removeHandler(0, GridDhtPartitionsSingleRequest.class); - IgniteCheckedException err = cctx.kernalContext().clientDisconnected() ? + stopErr = cctx.kernalContext().clientDisconnected() ? new IgniteClientDisconnectedCheckedException(cctx.kernalContext().cluster().clientReconnectFuture(), "Client node disconnected: " + cctx.gridName()) : new IgniteInterruptedCheckedException("Node is stopping: " + cctx.gridName()); @@ -391,11 +394,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (exchFuts0 != null) { for (GridDhtPartitionsExchangeFuture f : exchFuts.values()) - f.onDone(err); + f.onDone(stopErr); } for (AffinityReadyFuture f : readyFuts.values()) - f.onDone(err); + f.onDone(stopErr); + + for (GridDhtPartitionsExchangeFuture f : pendingExchangeFuts) + f.onDone(stopErr); + + if (locExchFut != null) + locExchFut.onDone(stopErr); U.cancel(exchWorker); @@ -519,6 +528,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana fut.onDone(topVer); } + else if (stopErr != null) + fut.onDone(stopErr); return fut; } @@ -791,6 +802,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (discoEvt != null) fut.onEvent(exchId, discoEvt); + if (stopErr != null) + fut.onDone(stopErr); + return fut; } @@ -799,12 +813,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param err Error. */ public void onExchangeDone(GridDhtPartitionsExchangeFuture exchFut, @Nullable Throwable err) { - if (err == null) { - AffinityTopologyVersion topVer = exchFut.topologyVersion(); + AffinityTopologyVersion topVer = exchFut.topologyVersion(); - if (log.isDebugEnabled()) - log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ']'); + if (log.isDebugEnabled()) + log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ", err=" + err + ']'); + if (err == null) { while (true) { AffinityTopologyVersion readyVer = readyTopVer.get(); @@ -825,8 +839,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } } - else if (log.isDebugEnabled()) - log.debug("Exchange done with error [fut=" + exchFut + ", err=" + err + ']'); + else { + for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) { + if (entry.getKey().compareTo(topVer) <= 0) { + if (log.isDebugEnabled()) + log.debug("Completing created topology ready future with error " + + "[ver=" + topVer + ", fut=" + entry.getValue() + ']'); + + entry.getValue().onDone(err); + } + } + } ExchangeFutureSet exchFuts0 = exchFuts; http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 4ae0baa..c92de7d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -960,6 +960,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } + cancelFutures(); + List<? extends GridCacheSharedManager<?, ?>> sharedMgrs = sharedCtx.managers(); for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = sharedMgrs.listIterator(sharedMgrs.size()); @@ -1323,12 +1325,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { evictMgr, qryMgr, contQryMgr, - affMgr, dataStructuresMgr, ttlMgr, drMgr, rslvrMgr, - pluginMgr + pluginMgr, + affMgr ); cacheCtx.cacheObjectContext(cacheObjCtx); @@ -1452,12 +1454,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { evictMgr, qryMgr, contQryMgr, - affMgr, dataStructuresMgr, ttlMgr, drMgr, rslvrMgr, - pluginMgr + pluginMgr, + affMgr ); cacheCtx.cacheObjectContext(cacheObjCtx); @@ -2325,9 +2327,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { try { ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sndReqs)); - if (ctx.clientDisconnected()) + if (ctx.isStopping()) { + err = new IgniteCheckedException("Failed to execute dynamic cache change request, " + + "node is stopping."); + } + else if (ctx.clientDisconnected()) { err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), "Failed to execute dynamic cache change request, client node disconnected."); + } } catch (IgniteCheckedException e) { err = e; @@ -3036,9 +3043,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { try { ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(Collections.singleton(req))); - if (ctx.clientDisconnected()) + if (ctx.isStopping()) { + err = new IgniteCheckedException("Failed to execute dynamic cache change request, " + + "node is stopping."); + } + else if (ctx.clientDisconnected()) { err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), "Failed to execute dynamic cache change request, client node disconnected."); + } } catch (IgniteCheckedException e) { err = e; @@ -3104,8 +3116,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * Cancel all user operations. */ - public void cancelUserOperations() { - sharedCtx.mvcc().cancelClientFutures(); + private void cancelFutures() { + sharedCtx.mvcc().onStop(); Exception err = new IgniteCheckedException("Operation has been cancelled (node is stopping)."); http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java index cdb878d..8a95b14 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java @@ -137,10 +137,8 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage { return futId; } - /** - * @return Error. - */ - public Throwable error() { + /** {@inheritDoc} */ + @Override public Throwable error() { return err; } http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java index 4264830..e798458 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java @@ -67,10 +67,8 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage this.err = err; } - /** - * @return Error. - */ - public Throwable error() { + /** {@inheritDoc} */ + @Override public Throwable error() { return err; } http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java index c11a3d7..6ade26f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.jetbrains.annotations.Nullable; /** * Future that implements a barrier after which dht topology is safe to use. Topology is considered to be @@ -38,9 +39,10 @@ public interface GridDhtTopologyFuture extends IgniteInternalFuture<AffinityTopo public AffinityTopologyVersion topologyVersion(); /** - * Returns is cache topology valid. + * Returns error is cache topology is not valid. + * * @param cctx Cache context. * @return valid ot not. */ - public boolean isCacheTopologyValid(GridCacheContext cctx); + @Nullable public Throwable validateCache(GridCacheContext cctx); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index b9514a9..1a869e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -1217,7 +1217,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach Throwable err = res.error(); // Log error before sending reply. - if (err != null && !(err instanceof GridCacheLockTimeoutException)) + if (err != null && !(err instanceof GridCacheLockTimeoutException) && !ctx.kernalContext().isStopping()) U.error(log, "Failed to acquire lock for request: " + req, err); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java index 33651bc..04d36e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java @@ -97,16 +97,15 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri /** * Sets update error. - * @param err + * + * @param err Error. */ public void onError(IgniteCheckedException err){ this.err = err; } - /** - * @return Gets update error. - */ - public IgniteCheckedException error() { + /** {@inheritDoc} */ + @Override public IgniteCheckedException error() { return err; } @@ -154,8 +153,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri nearEvicted.add(key); } - /** {@inheritDoc} - * @param ctx*/ + /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index d93f68f..fb2c5ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -385,9 +385,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture(); if (fut.isDone()) { - if (!fut.isCacheTopologyValid(cctx)) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + - cctx.name())); + Throwable err = fut.validateCache(cctx); + + if (err != null) { + onDone(err); return; } @@ -811,6 +812,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } Exception err = null; + GridNearAtomicUpdateRequest singleReq0 = null; Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null; int size = keys.size(); @@ -837,13 +839,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (size == 1 && !fastMap) { assert remapKeys == null || remapKeys.size() == 1; - singleReq = mapSingleUpdate(); + singleReq0 = singleReq = mapSingleUpdate(); } else { pendingMappings = mapUpdate(topNodes); if (pendingMappings.size() == 1) - singleReq = F.firstValue(pendingMappings); + singleReq0 = singleReq = F.firstValue(pendingMappings); else { if (syncMode == PRIMARY_SYNC) { mappings = U.newHashMap(pendingMappings.size()); @@ -874,8 +876,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } // Optimize mapping for single key. - if (singleReq != null) - mapSingle(singleReq.nodeId(), singleReq); + if (singleReq0 != null) + mapSingle(singleReq0.nodeId(), singleReq0); else { assert pendingMappings != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index 5f5fbb5..ccb67d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -198,6 +198,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri boolean skipStore, boolean clientReq ) { + assert futVer != null; + this.cacheId = cacheId; this.nodeId = nodeId; this.futVer = futVer; http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index 8bc145c..376f4ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -116,6 +116,8 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr * @param futVer Future version. */ public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer) { + assert futVer != null; + this.cacheId = cacheId; this.nodeId = nodeId; this.futVer = futVer; @@ -149,16 +151,15 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** * Sets update error. - * @param err + * + * @param err Error. */ public void error(IgniteCheckedException err){ this.err = err; } - /** - * @return Update error, if any. - */ - public IgniteCheckedException error() { + /** {@inheritDoc} */ + @Override public IgniteCheckedException error() { return err; } http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 596ec77..1a08265 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.C2; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; @@ -524,7 +525,22 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture /** {@inheritDoc} */ @Override public String toString() { - return S.toString(GridDhtColocatedLockFuture.class, this, "inTx", inTx(), "super", super.toString()); + Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { + @Override public String apply(IgniteInternalFuture<?> f) { + if (isMini(f)) { + MiniFuture m = (MiniFuture)f; + + return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]"; + } + else + return "[loc=true, done=" + f.isDone() + "]"; + } + }); + + return S.toString(GridDhtColocatedLockFuture.class, this, + "innerFuts", futs, + "inTx", inTx(), + "super", super.toString()); } /** @@ -565,9 +581,10 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture if (topVer != null) { for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()){ if (fut.topologyVersion().equals(topVer)){ - if (!fut.isCacheTopologyValid(cctx)) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + - cctx.name())); + Throwable err = fut.validateCache(cctx); + + if (err != null) { + onDone(err); return; } @@ -612,9 +629,10 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture GridDhtTopologyFuture fut = cctx.topologyVersionFuture(); if (fut.isDone()) { - if (!fut.isCacheTopologyValid(cctx)) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + - cctx.name())); + Throwable err = fut.validateCache(cctx); + + if (err != null) { + onDone(err); return; } @@ -643,10 +661,15 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture } else { fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { try { + fut.get(); + mapOnTopology(remap, c); } + catch (IgniteCheckedException e) { + onDone(e); + } finally { cctx.shared().txContextReset(); } @@ -1327,8 +1350,13 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture affFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { try { + fut.get(); + remap(); } + catch (IgniteCheckedException e) { + onDone(e); + } finally { cctx.shared().txContextReset(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java index 36a2da1..eaed424 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java @@ -283,7 +283,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec // Fail the whole thing. if (e instanceof ClusterTopologyCheckedException) fut.onResult((ClusterTopologyCheckedException)e); - else + else if (!cctx.kernalContext().isStopping()) fut.onResult(e); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java index d31f096..93e39ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java @@ -98,10 +98,8 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa this.err = err; } - /** - * @return Error, if any. - */ - public IgniteCheckedException error() { + /** {@inheritDoc} */ + @Override public IgniteCheckedException error() { return err; } http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 865bbdc..a1b03c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1081,9 +1081,22 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** {@inheritDoc} */ - @Override public boolean isCacheTopologyValid(GridCacheContext cctx) { - return cctx.config().getTopologyValidator() != null && cacheValidRes.containsKey(cctx.cacheId()) ? - cacheValidRes.get(cctx.cacheId()) : true; + @Override public Throwable validateCache(GridCacheContext cctx) { + Throwable err = error(); + + if (err != null) + return err; + + if (cctx.config().getTopologyValidator() != null) { + Boolean res = cacheValidRes.get(cctx.cacheId()); + + if (res != null && !res) { + return new IgniteCheckedException("Failed to perform cache operation " + + "(cache topology is not valid): " + cctx.name()); + } + } + + return null; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java index 3276377..d4493a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java @@ -163,10 +163,8 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe return topVer != null ? topVer : super.topologyVersion(); } - /** - * @return Error. - */ - public IgniteCheckedException error() { + /** {@inheritDoc} */ + @Override public IgniteCheckedException error() { return err; } http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index f3e5ca3..dcc8da6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -703,9 +703,10 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean if (topVer != null) { for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()){ if (fut.topologyVersion().equals(topVer)){ - if (!fut.isCacheTopologyValid(cctx)) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + - cctx.name())); + Throwable err = fut.validateCache(cctx); + + if (err != null) { + onDone(err); return; } @@ -749,9 +750,10 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean GridDhtTopologyFuture fut = cctx.topologyVersionFuture(); if (fut.isDone()) { - if (!fut.isCacheTopologyValid(cctx)) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + - cctx.name())); + Throwable err = fut.validateCache(cctx); + + if (err != null) { + onDone(err); return; } @@ -777,10 +779,15 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean } else { fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { try { + fut.get(); + mapOnTopology(remap); } + catch (IgniteCheckedException e) { + onDone(e); + } finally { cctx.shared().txContextReset(); } @@ -1435,8 +1442,13 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean affFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { try { + fut.get(); + remap(); } + catch (IgniteCheckedException e) { + onDone(e); + } finally { cctx.shared().txContextReset(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 2048fdf..25028c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -319,7 +319,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd assert ctx != null : cacheId; - if (!topFut.isCacheTopologyValid(ctx)) { + Throwable err = topFut.validateCache(ctx); + + if (err != null) { if (invalidCaches != null) invalidCaches.append(", "); else @@ -343,12 +345,17 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd } else { topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { + @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { @Override public void run() { try { + fut.get(); + prepareOnTopology(remap, c); } + catch (IgniteCheckedException e) { + onDone(e); + } finally { cctx.txContextReset(); } @@ -841,7 +848,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd if (affFut != null && !affFut.isDone()) { affFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { - remap(); + try { + fut.get(); + + remap(); + } + catch (IgniteCheckedException e) { + onDone(e); + } } }); } http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java index cec7d73..c860baa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java @@ -75,10 +75,8 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse { this.err = err; } - /** - * @return Error. - */ - @Nullable public Throwable error() { + /** {@inheritDoc} */ + @Nullable @Override public Throwable error() { return err; } http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java index 3e4cdeb..78e2ac7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java @@ -193,10 +193,8 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach return reqId; } - /** - * @return Error. - */ - public Throwable error() { + /** {@inheritDoc} */ + @Override public Throwable error() { return err; } http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index df6b4b7..c99e07f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; @@ -97,6 +98,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { /** Whether to skip primary check for REPLICATED cache. */ private transient boolean skipPrimaryCheck; + /** */ + private transient int cacheId; + /** * Required by {@link Externalizable}. */ @@ -145,6 +149,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { this.ignoreExpired = ignoreExpired; this.taskHash = taskHash; this.skipPrimaryCheck = skipPrimaryCheck; + + cacheId = CU.cacheId(cacheName); } /** {@inheritDoc} */ @@ -457,6 +463,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { sync = in.readBoolean(); ignoreExpired = in.readBoolean(); taskHash = in.readInt(); + + cacheId = CU.cacheId(cacheName); } /** @@ -466,9 +474,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { private GridCacheContext<K, V> cacheContext(GridKernalContext ctx) { assert ctx != null; - GridCacheAdapter<K, V> cache = ctx.cache().internalCache(cacheName); - - return cache == null ? null : cache.context(); + return ctx.cache().<K, V>context().cacheContext(cacheId); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 00b91dd..6ca1f72 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1105,6 +1105,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /** * Commits transaction to transaction manager. Used for one-phase commit transactions only. + * + * @param commit If {@code true} commits transaction, otherwise rollbacks. */ public void tmFinish(boolean commit) { assert onePhaseCommit(); @@ -1118,7 +1120,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter state(commit ? COMMITTED : ROLLED_BACK); - boolean needsCompletedVersions = needsCompletedVersions(); + boolean needsCompletedVersions = commit && needsCompletedVersions(); assert !needsCompletedVersions || completedBase != null; assert !needsCompletedVersions || committedVers != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java index 2aae6ef..6bfd4fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.util; +import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.util.tostring.GridToStringExclude; /** @@ -76,6 +77,15 @@ public class GridSpinBusyLock { } /** + * @param millis Timeout. + * @return {@code True} if lock was acquired. + * @throws InterruptedException If interrupted. + */ + public boolean tryBlock(long millis) throws InterruptedException { + return lock.tryWriteLock(millis, TimeUnit.MILLISECONDS); + } + + /** * Makes possible for activities entering busy state again. */ public void unblock() { http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java index 6b4d473..151167a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorResult; import javax.cache.processor.MutableEntry; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; @@ -184,20 +185,29 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes String val = "value-" + k; - cache.invoke(key, new Processor(val)); + procs.put(key, new Processor(val)); } - cache.invokeAll(procs); + Map<String, EntryProcessorResult<Integer>> resMap = cache.invokeAll(procs); + + for (String key : procs.keySet()) { + EntryProcessorResult<Integer> res = resMap.get(key); + + assertNotNull(res); + assertEquals(k + 1, (Object) res.get()); + } } else { + IgniteCache<String, Set<String>> cache = ignite(0).cache(null); + for (int i = 0; i < NUM_SETS; i++) { String key = "set-" + i; String val = "value-" + k; - IgniteCache<String, Set<String>> cache = ignite(0).cache(null); + Integer valsCnt = cache.invoke(key, new Processor(val)); - cache.invoke(key, new Processor(val)); + assertEquals(k + 1, (Object)valsCnt); } } } @@ -275,7 +285,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes } /** */ - private static class Processor implements EntryProcessor<String, Set<String>, Void>, Serializable { + private static class Processor implements EntryProcessor<String, Set<String>, Integer>, Serializable { /** */ private String val; @@ -287,7 +297,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes } /** {@inheritDoc} */ - @Override public Void process(MutableEntry<String, Set<String>> e, Object... args) { + @Override public Integer process(MutableEntry<String, Set<String>> e, Object... args) { Set<String> vals = e.getValue(); if (vals == null) @@ -297,7 +307,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes e.setValue(vals); - return null; + return vals.size(); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java index 7aae48c..88605b4 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java @@ -79,12 +79,12 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> { new GridCacheEvictionManager(), new GridCacheLocalQueryManager<K, V>(), new CacheContinuousQueryManager(), - new GridCacheAffinityManager(), new CacheDataStructuresManager(), new GridCacheTtlManager(), new GridOsCacheDrManager(), new CacheOsConflictResolutionManager<K, V>(), - new CachePluginManager(ctx, new CacheConfiguration()) + new CachePluginManager(ctx, new CacheConfiguration()), + new GridCacheAffinityManager() ); store().initialize(null, new IdentityHashMap<CacheStore, ThreadLocal>()); http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java index 1276405..e00611b 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java @@ -185,8 +185,6 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest * @throws Exception If failed. */ public void testRestarts() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-1452"); - int duration = 90 * 1000; int qryThreadNum = 4; int restartThreadsNum = 2; // 4 + 2 = 6 nodes
