This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new f64fe2c IGNITE-12739 Optimistic serializable transactions may fail
infinitely when read-through is enabled. (#7575)
f64fe2c is described below
commit f64fe2c5ba1d52038920ee56f42738d0e8a5eb0b
Author: Vladsz83 <[email protected]>
AuthorDate: Mon Apr 13 10:24:09 2020 +0300
IGNITE-12739 Optimistic serializable transactions may fail infinitely when
read-through is enabled. (#7575)
---
.../processors/cache/GridCacheContext.java | 8 +-
.../distributed/dht/GridPartitionedGetFuture.java | 2 +-
.../dht/GridPartitionedSingleGetFuture.java | 3 +-
.../cache/distributed/near/GridNearGetFuture.java | 3 +-
.../near/GridNearTransactionalCache.java | 4 +-
.../cache/distributed/near/GridNearTxLocal.java | 5 +-
.../transactions/TxOptimisticReadThroughTest.java | 216 +++++++++++++++++++++
.../testsuites/IgniteCacheMvccTestSuite6.java | 4 +
.../ignite/testsuites/IgniteCacheTestSuite6.java | 3 +
9 files changed, 239 insertions(+), 9 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 924cd11..faf48c1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -2245,16 +2245,18 @@ public class GridCacheContext<K, V> implements
Externalizable {
* @param affNodes All affinity nodes.
* @param canRemap Flag indicating that 'get' should be done on a locked
topology version.
* @param partId Partition ID.
+ * @param forcePrimary Force primary flag.
* @return Affinity node to get key from or {@code null} if there is no
suitable alive node.
*/
@Nullable public ClusterNode selectAffinityNodeBalanced(
List<ClusterNode> affNodes,
Set<ClusterNode> invalidNodes,
int partId,
- boolean canRemap
+ boolean canRemap,
+ boolean forcePrimary
) {
if (!readLoadBalancingEnabled) {
- if (!canRemap) {
+ if (!canRemap && !forcePrimary) {
// Find next available node if we can not wait next topology
version.
for (ClusterNode node : affNodes) {
if (ctx.discovery().alive(node) &&
!invalidNodes.contains(node))
@@ -2270,7 +2272,7 @@ public class GridCacheContext<K, V> implements
Externalizable {
}
}
- if (!readFromBackup){
+ if (!readFromBackup || forcePrimary){
ClusterNode first = affNodes.get(0);
return !invalidNodes.contains(first) ? first : null;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index d1706e1..da8a738 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -355,7 +355,7 @@ public class GridPartitionedGetFuture<K, V> extends
CacheDistributedGetFutureAda
if (tryLocalGet(key, part, topVer, affNodes, locVals))
return false;
- node = cctx.selectAffinityNodeBalanced(affNodes, invalidNodeSet,
part, canRemap);
+ node = cctx.selectAffinityNodeBalanced(affNodes, invalidNodeSet,
part, canRemap, forcePrimary);
}
// Failed if none remote node found.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 08abf5e..b9e9364 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -377,7 +377,8 @@ public class GridPartitionedSingleGetFuture extends
GridCacheFutureAdapter<Objec
if (tryLocalGet(key, part, topVer, affNodes))
return null;
- ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes,
getInvalidNodes(), part, canRemap);
+ ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes,
getInvalidNodes(), part, canRemap,
+ forcePrimary);
// Failed if none balanced node found.
if (affNode == null) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index d4513b7..5b22ab1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -403,7 +403,8 @@ public final class GridNearGetFuture<K, V> extends
CacheDistributedGetFutureAdap
Set<ClusterNode> invalidNodesSet = getInvalidNodes(part,
topVer);
- ClusterNode affNode =
cctx.selectAffinityNodeBalanced(affNodes, invalidNodesSet, part, canRemap);
+ ClusterNode affNode =
cctx.selectAffinityNodeBalanced(affNodes, invalidNodesSet, part, canRemap,
+ forcePrimary);
if (affNode == null) {
onDone(serverNotFoundError(part, topVer));
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index a9eec8e..c50edc7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -177,6 +177,7 @@ public class GridNearTransactionalCache<K, V> extends
GridNearCacheAdapter<K, V>
* @param tx Transaction.
* @param keys Keys to load.
* @param readThrough Read through flag.
+ * @param forcePrimary Force primary flag.
* @param deserializeBinary Deserialize binary flag.
* @param expiryPlc Expiry policy.
* @param skipVals Skip values flag.
@@ -187,6 +188,7 @@ public class GridNearTransactionalCache<K, V> extends
GridNearCacheAdapter<K, V>
AffinityTopologyVersion topVer,
@Nullable Collection<KeyCacheObject> keys,
boolean readThrough,
+ boolean forcePrimary,
boolean deserializeBinary,
boolean recovery,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
@@ -197,7 +199,7 @@ public class GridNearTransactionalCache<K, V> extends
GridNearCacheAdapter<K, V>
GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
keys,
readThrough,
- /*force primary*/needVer || !ctx.config().isReadFromBackup(),
+ forcePrimary,
tx,
CU.subjectId(tx, ctx.shared()),
tx.resolveTaskName(),
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 41cd597..e404b65 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -3101,6 +3101,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
topVer,
keys,
readThrough,
+ needVer || !cacheCtx.config().isReadFromBackup() ||
(optimistic() && serializable() && readThrough),
/*deserializeBinary*/false,
recovery,
expiryPlc0,
@@ -3165,7 +3166,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
return cacheCtx.colocated().loadAsync(
key,
readThrough,
- /*force primary*/needVer ||
!cacheCtx.config().isReadFromBackup(),
+ needVer || !cacheCtx.config().isReadFromBackup() ||
(optimistic() && serializable() && readThrough),
topVer,
CU.subjectId(this, cctx),
resolveTaskName(),
@@ -3198,7 +3199,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
return cacheCtx.colocated().loadAsync(
keys,
readThrough,
- /*force primary*/needVer ||
!cacheCtx.config().isReadFromBackup(),
+ needVer || !cacheCtx.config().isReadFromBackup() ||
(optimistic() && serializable() && readThrough),
topVer,
CU.subjectId(this, cctx),
resolveTaskName(),
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticReadThroughTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticReadThroughTest.java
new file mode 100644
index 0000000..f97a5ab
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticReadThroughTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ * Tests optimistic tx with read/through cache.
+ */
+public class TxOptimisticReadThroughTest extends GridCommonAbstractTest {
+ /** Test nodes count. */
+ protected static final int NODE_CNT = 2;
+
+ /** Shared read/write-through store. */
+ private static final Map<Object, Object> storeMap = new
ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ // Different adresses not to pickup only first node when searching
value wihit transaction.
+ cfg.setUserAttributes(Collections.singletonMap(
+ IgniteNodeAttributes.ATTR_MACS_OVERRIDE,
UUID.randomUUID().toString()));
+
+ return cfg;
+ }
+
+ /**
+ * Checks optimistic serializable transaction asks primary node for
actual value version when read-through is
+ * enabled for replicated transactional cache.
+ */
+ @Test
+ public void testReplicated() throws Exception {
+ CacheConfiguration<Object, Object> cacheCfg = cacheConfiguration();
+
+ cacheCfg.setCacheMode(REPLICATED);
+
+ checkOptimisticSerializableTransaction(cacheCfg, false);
+ }
+
+ /**
+ * Checks optimistic serializable transaction asks primary node for
actual value version when read-through is
+ * enabled for partitioned transactional cache.
+ */
+ @Test
+ public void testPartitioned() throws Exception {
+ CacheConfiguration<Object, Object> cacheCfg = cacheConfiguration();
+
+ cacheCfg.setCacheMode(PARTITIONED);
+
+ checkOptimisticSerializableTransaction(cacheCfg, false);
+ }
+
+ /**
+ * Checks optimistic serializable transaction asks primary node for
actual value version when read-through is
+ * enabled for near partitioned transactional cache.
+ */
+ @Test
+ public void testNearPartitioned() throws Exception {
+ CacheConfiguration<Object, Object> cacheCfg = cacheConfiguration();
+
+ cacheCfg.setCacheMode(PARTITIONED);
+
+ checkOptimisticSerializableTransaction(cacheCfg, true);
+ }
+
+ /**
+ * Checks optimistic serializable transaction asks primary node for
actual value version when read-through is
+ * enabled for near replicated transactional cache.
+ */
+ @Test
+ public void testNearReplicated() throws Exception {
+ CacheConfiguration<Object, Object> cacheCfg = cacheConfiguration();
+
+ cacheCfg.setCacheMode(REPLICATED);
+
+ checkOptimisticSerializableTransaction(cacheCfg, true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids(true);
+ }
+
+ /**
+ * Checks optimistic serializable transaction asks primary node for
actual value version when read-throug is
+ * enabled for a transactional cache.
+ *
+ * @param cacheCfg Transactional cache configuration for the testing cache.
+ * @param near {@code True} to check transaction with near cache from a
client node. {@code False} for server node.
+ */
+ private void
checkOptimisticSerializableTransaction(CacheConfiguration<Object, Object>
cacheCfg, boolean near)
+ throws Exception {
+ startGrids(NODE_CNT);
+
+ final IgniteCache<Object, Object> cache0 =
grid(0).getOrCreateCache(cacheCfg);
+
+ final IgniteCache<Object, Object> txCache;
+ final IgniteEx txGrid;
+
+ if (near) {
+ txGrid = startClientGrid();
+
+ txCache = txGrid.createNearCache(cacheCfg.getName(), new
NearCacheConfiguration<>());
+ }
+ else {
+ txGrid = grid(1);
+
+ txCache = grid(1).cache(cacheCfg.getName());
+ }
+
+ List<Integer> primaryKeys = primaryKeys(cache0, 3);
+
+ primaryKeys.forEach(k -> cache0.put(k, k));
+
+ primaryKeys.forEach(k -> cache0.localClear(k));
+
+ primaryKeys.forEach(k -> assertEquals(k, cache0.get(k)));
+
+ try (Transaction tx = txGrid.transactions().txStart(OPTIMISTIC,
SERIALIZABLE)) {
+ // Force requesting value from primary node by one key.
+ txCache.get(primaryKeys.get(0));
+
+ // Force requesting value from primary node by keys batch.
+ txCache.getAll(new HashSet<>(primaryKeys.subList(1,
primaryKeys.size())));
+
+ primaryKeys.forEach(k -> txCache.put(k, k + 1));
+
+ tx.commit();
+ }
+
+ for (int i = 0; i < NODE_CNT; ++i) {
+ IgniteCache<Object, Object> cache =
grid(i).cache(cacheCfg.getName());
+
+ primaryKeys.forEach(k -> assertEquals(k + 1, cache.get(k)));
+ }
+ }
+
+ /** @return Default configuration of a transactional cache cache. */
+ private static CacheConfiguration<Object, Object> cacheConfiguration(){
+ return new CacheConfiguration<>("tx")
+
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setCacheStoreFactory(new TestStoreFactory())
+ .setAtomicityMode(TRANSACTIONAL)
+ .setBackups(1)
+ .setReadThrough(true)
+ .setWriteThrough(true);
+ }
+
+ /** Shared read/write-through store factory. */
+ private static class TestStoreFactory implements
Factory<CacheStore<Object, Object>> {
+ /** {@inheritDoc} */
+ @Override public CacheStore<Object, Object> create() {
+ return new CacheStoreAdapter<Object, Object>() {
+ /** {@inheritDoc} */
+ @Override public Object load(Object key) throws
CacheLoaderException {
+ return storeMap.get(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<?, ?> entry) throws
CacheWriterException {
+ storeMap.put(entry.getKey(), entry.getValue());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) throws
CacheWriterException {
+ storeMap.remove(key);
+ }
+ };
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
index bd46b72..949ff41 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
@@ -45,6 +45,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.lat
import
org.apache.ignite.internal.processors.cache.transactions.TxLocalDhtMixedCacheModesTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPartitionExchangeTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxOptimisticPrepareOnUnstableTopologyTest;
+import
org.apache.ignite.internal.processors.cache.transactions.TxOptimisticReadThroughTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutOnePhaseCommitTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxStateChangeEventTest;
import org.apache.ignite.testframework.junits.DynamicSuite;
@@ -98,6 +99,9 @@ public class IgniteCacheMvccTestSuite6 {
ignoredTests.add(PartitionedTransactionalPessimisticCacheGetsDistributionTest.class);
// See PartitionedMvccTxPessimisticCacheGetsDistributionTest.
ignoredTests.add(ReplicatedTransactionalPessimisticCacheGetsDistributionTest.class);
//See ReplicatedMvccTxPessimisticCacheGetsDistributionTest
+ // Read-through is not allowed with MVCC and transactional cache.
+ ignoredTests.add(TxOptimisticReadThroughTest.class);
+
List<Class<?>> suite = new
ArrayList<>((IgniteCacheTestSuite6.suite(ignoredTests)));
// Add mvcc versions for skipped tests.
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 450d766..4898008 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -51,6 +51,7 @@ import
org.apache.ignite.internal.processors.cache.transactions.TxOnCachesStartT
import
org.apache.ignite.internal.processors.cache.transactions.TxOnCachesStopTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPartitionExchangeTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxOptimisticPrepareOnUnstableTopologyTest;
+import
org.apache.ignite.internal.processors.cache.transactions.TxOptimisticReadThroughTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxRollbackAsyncNearCacheTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxRollbackAsyncTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnIncorrectParamsTest;
@@ -128,6 +129,8 @@ public class IgniteCacheTestSuite6 {
GridTestUtils.addTestIfNeeded(suite,
TxOptimisticOnPartitionExchangeTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
TxOptimisticReadThroughTest.class, ignoredTests);
+
GridTestUtils.addTestIfNeeded(suite,
IgniteExchangeLatchManagerCoordinatorFailTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
IgniteExchangeLatchManagerDiscoHistoryTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, ExchangeLatchManagerTest.class,
ignoredTests);