Repository: ignite
Updated Branches:
  refs/heads/master 7f6cde195 -> 1039c8a79


IGNITE-10514 Cache validation on the primary node may result in AssertionError 
- Fixes #5558.

Signed-off-by: Ivan Rakov <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1039c8a7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1039c8a7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1039c8a7

Branch: refs/heads/master
Commit: 1039c8a7921858c7d577816ef1994de969d18e74
Parents: 7f6cde1
Author: Slava Koptilin <[email protected]>
Authored: Thu Dec 13 17:45:58 2018 +0300
Committer: Ivan Rakov <[email protected]>
Committed: Thu Dec 13 17:45:58 2018 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          | 52 ++++++++++++++++++--
 .../colocated/GridDhtColocatedLockFuture.java   | 13 ++++-
 .../distributed/near/GridNearLockFuture.java    | 12 ++++-
 .../near/GridNearTxAbstractEnlistFuture.java    | 13 ++++-
 .../near/TxTopologyVersionFuture.java           | 13 ++++-
 5 files changed, 96 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1039c8a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 6118fbb..51dae50 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -73,6 +73,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridPartition
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
@@ -1722,9 +1723,9 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
      * @param completionCb Completion callback.
      */
     private void updateAllAsyncInternal0(
-        ClusterNode node,
-        GridNearAtomicAbstractUpdateRequest req,
-        UpdateReplyClosure completionCb
+        final ClusterNode node,
+        final GridNearAtomicAbstractUpdateRequest req,
+        final UpdateReplyClosure completionCb
     ) {
         GridNearAtomicUpdateResponse res = new 
GridNearAtomicUpdateResponse(ctx.cacheId(),
             node.id(),
@@ -1785,6 +1786,51 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                                 if (validateCache) {
                                     GridDhtTopologyFuture topFut = 
top.topologyVersionFuture();
 
+                                    // Cache validation should use topology 
version from the update request
+                                    // in case of the topology version was 
locked on near node.
+                                    if (req.topologyLocked()) {
+                                        // affinityReadyFuture() can return 
GridFinishedFuture under some circumstances
+                                        // and therefore it cannot be used for 
validation.
+                                        
IgniteInternalFuture<AffinityTopologyVersion> affFut =
+                                            
ctx.shared().exchange().affinityReadyFuture(req.topologyVersion());
+
+                                        if (affFut.isDone()) {
+                                            
List<GridDhtPartitionsExchangeFuture> futs =
+                                                
ctx.shared().exchange().exchangeFutures();
+
+                                            boolean found = false;
+
+                                            for (int i = 0; i < futs.size(); 
++i) {
+                                                
GridDhtPartitionsExchangeFuture fut = futs.get(i);
+
+                                                // We have to check 
fut.exchangeDone() here -
+                                                // otherwise attempt to get 
topVer will throw error.
+                                                // We won't skip needed future 
as per affinity ready future is done.
+                                                if (fut.exchangeDone() &&
+                                                    
fut.topologyVersion().equals(req.topologyVersion())) {
+                                                    topFut = fut;
+
+                                                    found = true;
+
+                                                    break;
+                                                }
+                                            }
+
+                                            assert found: "The requested 
topology future cannot be found [topVer="
+                                                + req.topologyVersion() + ']';
+                                        }
+                                        else {
+                                            affFut.listen(f -> 
updateAllAsyncInternal0(node, req, completionCb));
+
+                                            return;
+                                        }
+
+                                        assert 
req.topologyVersion().equals(topFut.topologyVersion()) :
+                                            "The requested topology version 
cannot be found [" +
+                                                "reqTopFut=" + 
req.topologyVersion()
+                                                + ", topFut=" + topFut + ']';
+                                    }
+
                                     assert topFut.isDone() : topFut;
 
                                     Throwable err = topFut.validateCache(ctx, 
req.recovery(), false, null, null);

http://git-wip-us.apache.org/repos/asf/ignite/blob/1039c8a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 6b20eb2..2c81036 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -777,7 +777,18 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
         if (topVer != null) {
             for (GridDhtTopologyFuture fut : 
cctx.shared().exchange().exchangeFutures()) {
                 if (fut.exchangeDone() && 
fut.topologyVersion().equals(topVer)) {
-                    Throwable err = fut.validateCache(cctx, recovery, read, 
null, keys);
+                    Throwable err = null;
+
+                    // Before cache validation, make sure that this topology 
future is already completed.
+                    try {
+                        fut.get();
+                    }
+                    catch (IgniteCheckedException e) {
+                        err = fut.error();
+                    }
+
+                    if (err == null)
+                        err = fut.validateCache(cctx, recovery, read, null, 
keys);
 
                     if (err != null) {
                         onDone(err);

http://git-wip-us.apache.org/repos/asf/ignite/blob/1039c8a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index ceddf72..0863f6e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -845,7 +845,17 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
         if (topVer != null) {
             for (GridDhtTopologyFuture fut : 
cctx.shared().exchange().exchangeFutures()) {
                 if (fut.exchangeDone() && 
fut.topologyVersion().equals(topVer)){
-                    Throwable err = fut.validateCache(cctx, recovery, read, 
null, keys);
+                    Throwable err = null;
+
+                    // Before cache validation, make sure that this topology 
future is already completed.
+                    try {
+                        fut.get();
+                    }
+                    catch (IgniteCheckedException e) {
+                        err = fut.error();
+                    }
+
+                    err = (err == null)? fut.validateCache(cctx, recovery, 
read, null, keys): err;
 
                     if (err != null) {
                         onDone(err);

http://git-wip-us.apache.org/repos/asf/ignite/blob/1039c8a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
index bd09424..e93834b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
@@ -223,7 +223,18 @@ public abstract class GridNearTxAbstractEnlistFuture<T> 
extends GridCacheCompoun
         if (topVer != null) {
             for (GridDhtTopologyFuture fut : 
cctx.shared().exchange().exchangeFutures()) {
                 if (fut.exchangeDone() && 
fut.topologyVersion().equals(topVer)) {
-                    Throwable err = fut.validateCache(cctx, false, false, 
null, null);
+                    Throwable err = null;
+
+                    // Before cache validation, make sure that this topology 
future is already completed.
+                    try {
+                        fut.get();
+                    }
+                    catch (IgniteCheckedException e) {
+                        err = fut.error();
+                    }
+
+                    if (err == null)
+                        err = fut.validateCache(cctx, false, false, null, 
null);
 
                     if (err != null) {
                         onDone(err);

http://git-wip-us.apache.org/repos/asf/ignite/blob/1039c8a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/TxTopologyVersionFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/TxTopologyVersionFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/TxTopologyVersionFuture.java
index b5e3883..c13bf0e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/TxTopologyVersionFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/TxTopologyVersionFuture.java
@@ -70,7 +70,18 @@ public class TxTopologyVersionFuture extends 
GridFutureAdapter<AffinityTopologyV
         if (topVer != null) {
             for (GridDhtTopologyFuture fut : 
cctx.shared().exchange().exchangeFutures()) {
                 if (fut.exchangeDone() && 
fut.topologyVersion().equals(topVer)) {
-                    Throwable err = fut.validateCache(cctx, false, false, 
null, null);
+                    Throwable err = null;
+
+                    // Before cache validation, make sure that this topology 
future is already completed.
+                    try {
+                        fut.get();
+                    }
+                    catch (IgniteCheckedException e) {
+                        err = fut.error();
+                    }
+
+                    if (err == null)
+                        err = fut.validateCache(cctx, false, false, null, 
null);
 
                     if (err != null) {
                         onDone(err);

Reply via email to