Repository: ignite Updated Branches: refs/heads/master 7f6cde195 -> 1039c8a79
IGNITE-10514 Cache validation on the primary node may result in AssertionError - Fixes #5558. Signed-off-by: Ivan Rakov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1039c8a7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1039c8a7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1039c8a7 Branch: refs/heads/master Commit: 1039c8a7921858c7d577816ef1994de969d18e74 Parents: 7f6cde1 Author: Slava Koptilin <[email protected]> Authored: Thu Dec 13 17:45:58 2018 +0300 Committer: Ivan Rakov <[email protected]> Committed: Thu Dec 13 17:45:58 2018 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 52 ++++++++++++++++++-- .../colocated/GridDhtColocatedLockFuture.java | 13 ++++- .../distributed/near/GridNearLockFuture.java | 12 ++++- .../near/GridNearTxAbstractEnlistFuture.java | 13 ++++- .../near/TxTopologyVersionFuture.java | 13 ++++- 5 files changed, 96 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1039c8a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 6118fbb..51dae50 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -73,6 +73,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartition import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; @@ -1722,9 +1723,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param completionCb Completion callback. */ private void updateAllAsyncInternal0( - ClusterNode node, - GridNearAtomicAbstractUpdateRequest req, - UpdateReplyClosure completionCb + final ClusterNode node, + final GridNearAtomicAbstractUpdateRequest req, + final UpdateReplyClosure completionCb ) { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), node.id(), @@ -1785,6 +1786,51 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (validateCache) { GridDhtTopologyFuture topFut = top.topologyVersionFuture(); + // Cache validation should use topology version from the update request + // in case of the topology version was locked on near node. + if (req.topologyLocked()) { + // affinityReadyFuture() can return GridFinishedFuture under some circumstances + // and therefore it cannot be used for validation. + IgniteInternalFuture<AffinityTopologyVersion> affFut = + ctx.shared().exchange().affinityReadyFuture(req.topologyVersion()); + + if (affFut.isDone()) { + List<GridDhtPartitionsExchangeFuture> futs = + ctx.shared().exchange().exchangeFutures(); + + boolean found = false; + + for (int i = 0; i < futs.size(); ++i) { + GridDhtPartitionsExchangeFuture fut = futs.get(i); + + // We have to check fut.exchangeDone() here - + // otherwise attempt to get topVer will throw error. + // We won't skip needed future as per affinity ready future is done. + if (fut.exchangeDone() && + fut.topologyVersion().equals(req.topologyVersion())) { + topFut = fut; + + found = true; + + break; + } + } + + assert found: "The requested topology future cannot be found [topVer=" + + req.topologyVersion() + ']'; + } + else { + affFut.listen(f -> updateAllAsyncInternal0(node, req, completionCb)); + + return; + } + + assert req.topologyVersion().equals(topFut.topologyVersion()) : + "The requested topology version cannot be found [" + + "reqTopFut=" + req.topologyVersion() + + ", topFut=" + topFut + ']'; + } + assert topFut.isDone() : topFut; Throwable err = topFut.validateCache(ctx, req.recovery(), false, null, null); http://git-wip-us.apache.org/repos/asf/ignite/blob/1039c8a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 6b20eb2..2c81036 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -777,7 +777,18 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF if (topVer != null) { for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()) { if (fut.exchangeDone() && fut.topologyVersion().equals(topVer)) { - Throwable err = fut.validateCache(cctx, recovery, read, null, keys); + Throwable err = null; + + // Before cache validation, make sure that this topology future is already completed. + try { + fut.get(); + } + catch (IgniteCheckedException e) { + err = fut.error(); + } + + if (err == null) + err = fut.validateCache(cctx, recovery, read, null, keys); if (err != null) { onDone(err); http://git-wip-us.apache.org/repos/asf/ignite/blob/1039c8a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index ceddf72..0863f6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -845,7 +845,17 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo if (topVer != null) { for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()) { if (fut.exchangeDone() && fut.topologyVersion().equals(topVer)){ - Throwable err = fut.validateCache(cctx, recovery, read, null, keys); + Throwable err = null; + + // Before cache validation, make sure that this topology future is already completed. + try { + fut.get(); + } + catch (IgniteCheckedException e) { + err = fut.error(); + } + + err = (err == null)? fut.validateCache(cctx, recovery, read, null, keys): err; if (err != null) { onDone(err); http://git-wip-us.apache.org/repos/asf/ignite/blob/1039c8a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java index bd09424..e93834b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java @@ -223,7 +223,18 @@ public abstract class GridNearTxAbstractEnlistFuture<T> extends GridCacheCompoun if (topVer != null) { for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()) { if (fut.exchangeDone() && fut.topologyVersion().equals(topVer)) { - Throwable err = fut.validateCache(cctx, false, false, null, null); + Throwable err = null; + + // Before cache validation, make sure that this topology future is already completed. + try { + fut.get(); + } + catch (IgniteCheckedException e) { + err = fut.error(); + } + + if (err == null) + err = fut.validateCache(cctx, false, false, null, null); if (err != null) { onDone(err); http://git-wip-us.apache.org/repos/asf/ignite/blob/1039c8a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/TxTopologyVersionFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/TxTopologyVersionFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/TxTopologyVersionFuture.java index b5e3883..c13bf0e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/TxTopologyVersionFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/TxTopologyVersionFuture.java @@ -70,7 +70,18 @@ public class TxTopologyVersionFuture extends GridFutureAdapter<AffinityTopologyV if (topVer != null) { for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()) { if (fut.exchangeDone() && fut.topologyVersion().equals(topVer)) { - Throwable err = fut.validateCache(cctx, false, false, null, null); + Throwable err = null; + + // Before cache validation, make sure that this topology future is already completed. + try { + fut.get(); + } + catch (IgniteCheckedException e) { + err = fut.error(); + } + + if (err == null) + err = fut.validateCache(cctx, false, false, null, null); if (err != null) { onDone(err);
