This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new f64fe2c  IGNITE-12739 Optimistic serializable transactions may fail 
infinitely when read-through is enabled. (#7575)
f64fe2c is described below

commit f64fe2c5ba1d52038920ee56f42738d0e8a5eb0b
Author: Vladsz83 <[email protected]>
AuthorDate: Mon Apr 13 10:24:09 2020 +0300

    IGNITE-12739 Optimistic serializable transactions may fail infinitely when 
read-through is enabled. (#7575)
---
 .../processors/cache/GridCacheContext.java         |   8 +-
 .../distributed/dht/GridPartitionedGetFuture.java  |   2 +-
 .../dht/GridPartitionedSingleGetFuture.java        |   3 +-
 .../cache/distributed/near/GridNearGetFuture.java  |   3 +-
 .../near/GridNearTransactionalCache.java           |   4 +-
 .../cache/distributed/near/GridNearTxLocal.java    |   5 +-
 .../transactions/TxOptimisticReadThroughTest.java  | 216 +++++++++++++++++++++
 .../testsuites/IgniteCacheMvccTestSuite6.java      |   4 +
 .../ignite/testsuites/IgniteCacheTestSuite6.java   |   3 +
 9 files changed, 239 insertions(+), 9 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 924cd11..faf48c1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -2245,16 +2245,18 @@ public class GridCacheContext<K, V> implements 
Externalizable {
      * @param affNodes All affinity nodes.
      * @param canRemap Flag indicating that 'get' should be done on a locked 
topology version.
      * @param partId Partition ID.
+     * @param forcePrimary Force primary flag.
      * @return Affinity node to get key from or {@code null} if there is no 
suitable alive node.
      */
     @Nullable public ClusterNode selectAffinityNodeBalanced(
         List<ClusterNode> affNodes,
         Set<ClusterNode> invalidNodes,
         int partId,
-        boolean canRemap
+        boolean canRemap,
+        boolean forcePrimary
     ) {
         if (!readLoadBalancingEnabled) {
-            if (!canRemap) {
+            if (!canRemap && !forcePrimary) {
                 // Find next available node if we can not wait next topology 
version.
                 for (ClusterNode node : affNodes) {
                     if (ctx.discovery().alive(node) && 
!invalidNodes.contains(node))
@@ -2270,7 +2272,7 @@ public class GridCacheContext<K, V> implements 
Externalizable {
             }
         }
 
-        if (!readFromBackup){
+        if (!readFromBackup || forcePrimary){
             ClusterNode first = affNodes.get(0);
 
             return !invalidNodes.contains(first) ? first : null;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index d1706e1..da8a738 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -355,7 +355,7 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
             if (tryLocalGet(key, part, topVer, affNodes, locVals))
                 return false;
 
-            node = cctx.selectAffinityNodeBalanced(affNodes, invalidNodeSet, 
part, canRemap);
+            node = cctx.selectAffinityNodeBalanced(affNodes, invalidNodeSet, 
part, canRemap, forcePrimary);
         }
 
         // Failed if none remote node found.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 08abf5e..b9e9364 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -377,7 +377,8 @@ public class GridPartitionedSingleGetFuture extends 
GridCacheFutureAdapter<Objec
         if (tryLocalGet(key, part, topVer, affNodes))
             return null;
 
-        ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, 
getInvalidNodes(), part, canRemap);
+        ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, 
getInvalidNodes(), part, canRemap,
+            forcePrimary);
 
         // Failed if none balanced node found.
         if (affNode == null) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index d4513b7..5b22ab1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -403,7 +403,8 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
 
                     Set<ClusterNode> invalidNodesSet = getInvalidNodes(part, 
topVer);
 
-                    ClusterNode affNode = 
cctx.selectAffinityNodeBalanced(affNodes, invalidNodesSet, part, canRemap);
+                    ClusterNode affNode = 
cctx.selectAffinityNodeBalanced(affNodes, invalidNodesSet, part, canRemap,
+                        forcePrimary);
 
                     if (affNode == null) {
                         onDone(serverNotFoundError(part, topVer));
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index a9eec8e..c50edc7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -177,6 +177,7 @@ public class GridNearTransactionalCache<K, V> extends 
GridNearCacheAdapter<K, V>
      * @param tx Transaction.
      * @param keys Keys to load.
      * @param readThrough Read through flag.
+     * @param forcePrimary Force primary flag.
      * @param deserializeBinary Deserialize binary flag.
      * @param expiryPlc Expiry policy.
      * @param skipVals Skip values flag.
@@ -187,6 +188,7 @@ public class GridNearTransactionalCache<K, V> extends 
GridNearCacheAdapter<K, V>
         AffinityTopologyVersion topVer,
         @Nullable Collection<KeyCacheObject> keys,
         boolean readThrough,
+        boolean forcePrimary,
         boolean deserializeBinary,
         boolean recovery,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
@@ -197,7 +199,7 @@ public class GridNearTransactionalCache<K, V> extends 
GridNearCacheAdapter<K, V>
         GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
             keys,
             readThrough,
-            /*force primary*/needVer || !ctx.config().isReadFromBackup(),
+            forcePrimary,
             tx,
             CU.subjectId(tx, ctx.shared()),
             tx.resolveTaskName(),
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 41cd597..e404b65 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -3101,6 +3101,7 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements GridTimeou
                 topVer,
                 keys,
                 readThrough,
+                needVer || !cacheCtx.config().isReadFromBackup() || 
(optimistic() && serializable() && readThrough),
                 /*deserializeBinary*/false,
                 recovery,
                 expiryPlc0,
@@ -3165,7 +3166,7 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements GridTimeou
                 return cacheCtx.colocated().loadAsync(
                     key,
                     readThrough,
-                    /*force primary*/needVer || 
!cacheCtx.config().isReadFromBackup(),
+                    needVer || !cacheCtx.config().isReadFromBackup() || 
(optimistic() && serializable() && readThrough),
                     topVer,
                     CU.subjectId(this, cctx),
                     resolveTaskName(),
@@ -3198,7 +3199,7 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements GridTimeou
                 return cacheCtx.colocated().loadAsync(
                     keys,
                     readThrough,
-                    /*force primary*/needVer || 
!cacheCtx.config().isReadFromBackup(),
+                    needVer || !cacheCtx.config().isReadFromBackup() || 
(optimistic() && serializable() && readThrough),
                     topVer,
                     CU.subjectId(this, cctx),
                     resolveTaskName(),
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticReadThroughTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticReadThroughTest.java
new file mode 100644
index 0000000..f97a5ab
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticReadThroughTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ * Tests optimistic tx with read/through cache.
+ */
+public class TxOptimisticReadThroughTest extends GridCommonAbstractTest {
+    /** Test nodes count. */
+    protected static final int NODE_CNT = 2;
+
+    /** Shared read/write-through store. */
+    private static final Map<Object, Object> storeMap = new 
ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        // Different adresses not to pickup only first node when searching 
value wihit transaction.
+        cfg.setUserAttributes(Collections.singletonMap(
+            IgniteNodeAttributes.ATTR_MACS_OVERRIDE, 
UUID.randomUUID().toString()));
+
+        return cfg;
+    }
+
+    /**
+     *  Checks optimistic serializable transaction asks primary node for 
actual value version when read-through is
+     *  enabled for replicated transactional cache.
+     */
+    @Test
+    public void testReplicated() throws Exception {
+        CacheConfiguration<Object, Object> cacheCfg = cacheConfiguration();
+
+        cacheCfg.setCacheMode(REPLICATED);
+
+        checkOptimisticSerializableTransaction(cacheCfg, false);
+    }
+
+    /**
+     *  Checks optimistic serializable transaction asks primary node for 
actual value version when read-through is
+     *  enabled for partitioned transactional cache.
+     */
+    @Test
+    public void testPartitioned() throws Exception {
+        CacheConfiguration<Object, Object> cacheCfg = cacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+
+        checkOptimisticSerializableTransaction(cacheCfg, false);
+    }
+
+    /**
+     *  Checks optimistic serializable transaction asks primary node for 
actual value version when read-through is
+     *  enabled for near partitioned transactional cache.
+     */
+    @Test
+    public void testNearPartitioned() throws Exception {
+        CacheConfiguration<Object, Object> cacheCfg = cacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+
+        checkOptimisticSerializableTransaction(cacheCfg, true);
+    }
+
+    /**
+     *  Checks optimistic serializable transaction asks primary node for 
actual value version when read-through is
+     *  enabled for near replicated transactional cache.
+     */
+    @Test
+    public void testNearReplicated() throws Exception {
+        CacheConfiguration<Object, Object> cacheCfg = cacheConfiguration();
+
+        cacheCfg.setCacheMode(REPLICATED);
+
+        checkOptimisticSerializableTransaction(cacheCfg, true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids(true);
+    }
+
+    /**
+     *  Checks optimistic serializable transaction asks primary node for 
actual value version when read-throug is
+     *  enabled for a transactional cache.
+     *
+     * @param cacheCfg Transactional cache configuration for the testing cache.
+     * @param near {@code True} to check transaction with near cache from a 
client node. {@code False} for server node.
+     */
+    private void 
checkOptimisticSerializableTransaction(CacheConfiguration<Object, Object> 
cacheCfg, boolean near)
+        throws Exception {
+        startGrids(NODE_CNT);
+
+        final IgniteCache<Object, Object> cache0 = 
grid(0).getOrCreateCache(cacheCfg);
+
+        final IgniteCache<Object, Object> txCache;
+        final IgniteEx txGrid;
+
+        if (near) {
+            txGrid = startClientGrid();
+
+            txCache = txGrid.createNearCache(cacheCfg.getName(), new 
NearCacheConfiguration<>());
+        }
+        else {
+            txGrid = grid(1);
+
+            txCache = grid(1).cache(cacheCfg.getName());
+        }
+
+        List<Integer> primaryKeys = primaryKeys(cache0, 3);
+
+        primaryKeys.forEach(k -> cache0.put(k, k));
+
+        primaryKeys.forEach(k -> cache0.localClear(k));
+
+        primaryKeys.forEach(k -> assertEquals(k, cache0.get(k)));
+
+        try (Transaction tx = txGrid.transactions().txStart(OPTIMISTIC, 
SERIALIZABLE)) {
+            // Force requesting value from primary node by one key.
+            txCache.get(primaryKeys.get(0));
+
+            // Force requesting value from primary node by keys batch.
+            txCache.getAll(new HashSet<>(primaryKeys.subList(1, 
primaryKeys.size())));
+
+            primaryKeys.forEach(k -> txCache.put(k, k + 1));
+
+            tx.commit();
+        }
+
+        for (int i = 0; i < NODE_CNT; ++i) {
+            IgniteCache<Object, Object> cache = 
grid(i).cache(cacheCfg.getName());
+
+            primaryKeys.forEach(k -> assertEquals(k + 1, cache.get(k)));
+        }
+    }
+
+    /** @return Default configuration of a transactional cache cache. */
+    private static CacheConfiguration<Object, Object> cacheConfiguration(){
+        return new CacheConfiguration<>("tx")
+            
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+            .setCacheStoreFactory(new TestStoreFactory())
+            .setAtomicityMode(TRANSACTIONAL)
+            .setBackups(1)
+            .setReadThrough(true)
+            .setWriteThrough(true);
+    }
+
+    /** Shared read/write-through store factory. */
+    private static class TestStoreFactory implements 
Factory<CacheStore<Object, Object>> {
+        /** {@inheritDoc} */
+        @Override public CacheStore<Object, Object> create() {
+            return new CacheStoreAdapter<Object, Object>() {
+                /** {@inheritDoc} */
+                @Override public Object load(Object key) throws 
CacheLoaderException {
+                    return storeMap.get(key);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void write(Cache.Entry<?, ?> entry) throws 
CacheWriterException {
+                    storeMap.put(entry.getKey(), entry.getValue());
+                }
+
+                /** {@inheritDoc} */
+                @Override public void delete(Object key) throws 
CacheWriterException {
+                    storeMap.remove(key);
+                }
+            };
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
index bd46b72..949ff41 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
@@ -45,6 +45,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.lat
 import 
org.apache.ignite.internal.processors.cache.transactions.TxLocalDhtMixedCacheModesTest;
 import 
org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPartitionExchangeTest;
 import 
org.apache.ignite.internal.processors.cache.transactions.TxOptimisticPrepareOnUnstableTopologyTest;
+import 
org.apache.ignite.internal.processors.cache.transactions.TxOptimisticReadThroughTest;
 import 
org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutOnePhaseCommitTest;
 import 
org.apache.ignite.internal.processors.cache.transactions.TxStateChangeEventTest;
 import org.apache.ignite.testframework.junits.DynamicSuite;
@@ -98,6 +99,9 @@ public class IgniteCacheMvccTestSuite6 {
         
ignoredTests.add(PartitionedTransactionalPessimisticCacheGetsDistributionTest.class);
 // See PartitionedMvccTxPessimisticCacheGetsDistributionTest.
         
ignoredTests.add(ReplicatedTransactionalPessimisticCacheGetsDistributionTest.class);
 //See ReplicatedMvccTxPessimisticCacheGetsDistributionTest
 
+        // Read-through is not allowed with MVCC and transactional cache.
+        ignoredTests.add(TxOptimisticReadThroughTest.class);
+
         List<Class<?>> suite = new 
ArrayList<>((IgniteCacheTestSuite6.suite(ignoredTests)));
 
         // Add mvcc versions for skipped tests.
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 450d766..4898008 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -51,6 +51,7 @@ import 
org.apache.ignite.internal.processors.cache.transactions.TxOnCachesStartT
 import 
org.apache.ignite.internal.processors.cache.transactions.TxOnCachesStopTest;
 import 
org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPartitionExchangeTest;
 import 
org.apache.ignite.internal.processors.cache.transactions.TxOptimisticPrepareOnUnstableTopologyTest;
+import 
org.apache.ignite.internal.processors.cache.transactions.TxOptimisticReadThroughTest;
 import 
org.apache.ignite.internal.processors.cache.transactions.TxRollbackAsyncNearCacheTest;
 import 
org.apache.ignite.internal.processors.cache.transactions.TxRollbackAsyncTest;
 import 
org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnIncorrectParamsTest;
@@ -128,6 +129,8 @@ public class IgniteCacheTestSuite6 {
 
         GridTestUtils.addTestIfNeeded(suite, 
TxOptimisticOnPartitionExchangeTest.class, ignoredTests);
 
+        GridTestUtils.addTestIfNeeded(suite, 
TxOptimisticReadThroughTest.class, ignoredTests);
+
         GridTestUtils.addTestIfNeeded(suite, 
IgniteExchangeLatchManagerCoordinatorFailTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
IgniteExchangeLatchManagerDiscoHistoryTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, ExchangeLatchManagerTest.class, 
ignoredTests);

Reply via email to