Repository: ignite Updated Branches: refs/heads/master bdb223996 -> d8e46de28
ignite-6454 Disable interrupts for cache.get in pessimistic tx like for all others cache operations Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d8e46de2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d8e46de2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d8e46de2 Branch: refs/heads/master Commit: d8e46de2899b35fa498310a31705e6a9760a84f3 Parents: bdb2239 Author: sboikov <sboi...@apache.org> Authored: Tue Oct 30 09:33:13 2018 +0300 Committer: sboikov <sboi...@apache.org> Committed: Tue Oct 30 09:33:13 2018 +0300 ---------------------------------------------------------------------- .../cache/distributed/near/GridNearTxLocal.java | 12 +- .../internal/util/future/GridFutureAdapter.java | 14 +- ...eAbstractDataStructuresFailoverSelfTest.java | 4 +- ...rtitionedDataStructuresFailoverSelfTest.java | 14 -- ...eplicatedDataStructuresFailoverSelfTest.java | 28 --- .../CacheOperationsInterruptTest.java | 170 +++++++++++++++++++ .../testsuites/IgniteCacheTestSuite9.java | 3 + 7 files changed, 193 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d8e46de2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index f56d99b..d8e4353 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -2422,16 +2422,16 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou try { IgniteInternalFuture<Map<K, V>> fut1 = plc2.apply(fut.get(), null); - return fut1.isDone() ? + return nonInterruptable(fut1.isDone() ? new GridFinishedFuture<>(finClos.apply(fut1.get(), null)) : - new GridEmbeddedFuture<>(finClos, fut1); + new GridEmbeddedFuture<>(finClos, fut1)); } catch (GridClosureException e) { return new GridFinishedFuture<>(e.unwrap()); } catch (IgniteCheckedException e) { try { - return plc2.apply(false, e); + return nonInterruptable(plc2.apply(false, e)); } catch (Exception e1) { return new GridFinishedFuture<>(e1); @@ -2439,10 +2439,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } } else { - return new GridEmbeddedFuture<>( + return nonInterruptable(new GridEmbeddedFuture<>( fut, plc2, - finClos); + finClos)); } } else { @@ -4739,7 +4739,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou * @param fut Future. * @return Future ignoring interrupts on {@code get()}. */ - private <T> IgniteInternalFuture<T> nonInterruptable(IgniteInternalFuture<T> fut) { + private static <T> IgniteInternalFuture<T> nonInterruptable(IgniteInternalFuture<T> fut) { // Safety. if (fut instanceof GridFutureAdapter) ((GridFutureAdapter)fut).ignoreInterrupts(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d8e46de2/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java index 8302504..a191f30 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java @@ -357,13 +357,23 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> { /** {@inheritDoc} */ @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb) { - return new ChainFuture<>(this, doneCb, null); + ChainFuture fut = new ChainFuture<>(this, doneCb, null); + + if (ignoreInterrupts) + fut.ignoreInterrupts(); + + return fut; } /** {@inheritDoc} */ @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb, Executor exec) { - return new ChainFuture<>(this, doneCb, exec); + ChainFuture fut = new ChainFuture<>(this, doneCb, exec); + + if (ignoreInterrupts) + fut.ignoreInterrupts(); + + return fut; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d8e46de2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java index 797e90f..6e3a7bb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java @@ -1320,7 +1320,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig protected final AtomicBoolean failed = new AtomicBoolean(false); /** */ - private final int topChangeThreads; + protected final int topChangeThreads; /** Flag to enable circular topology change. */ private boolean circular; @@ -1445,7 +1445,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig throw F.wrap(e); } } - }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + }, topChangeThreads, "topology-change-thread"); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d8e46de2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java index eecfefe..ecb2df9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java @@ -38,18 +38,4 @@ public class GridCachePartitionedDataStructuresFailoverSelfTest @Override protected CacheAtomicityMode collectionCacheAtomicityMode() { return TRANSACTIONAL; } - - /** - * - */ - @Override public void testReentrantLockConstantTopologyChangeNonFailoverSafe() { - fail("https://issues.apache.org/jira/browse/IGNITE-6454"); - } - - /** - * - */ - @Override public void testFairReentrantLockConstantTopologyChangeNonFailoverSafe() { - fail("https://issues.apache.org/jira/browse/IGNITE-6454"); - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d8e46de2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java index 27fbdcf..b093d12 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java @@ -38,32 +38,4 @@ public class GridCacheReplicatedDataStructuresFailoverSelfTest @Override protected CacheAtomicityMode collectionCacheAtomicityMode() { return TRANSACTIONAL; } - - /** - * - */ - @Override public void testFairReentrantLockConstantMultipleTopologyChangeNonFailoverSafe() { - fail("https://issues.apache.org/jira/browse/IGNITE-6454"); - } - - /** - * - */ - @Override public void testReentrantLockConstantMultipleTopologyChangeNonFailoverSafe() { - fail("https://issues.apache.org/jira/browse/IGNITE-6454"); - } - - /** - * - */ - @Override public void testReentrantLockConstantTopologyChangeNonFailoverSafe() { - fail("https://issues.apache.org/jira/browse/IGNITE-6454"); - } - - /** - * - */ - @Override public void testFairReentrantLockConstantTopologyChangeNonFailoverSafe() { - fail("https://issues.apache.org/jira/browse/IGNITE-6454"); - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d8e46de2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheOperationsInterruptTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheOperationsInterruptTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheOperationsInterruptTest.java new file mode 100644 index 0000000..971dbd1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheOperationsInterruptTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; + +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * + */ +public class CacheOperationsInterruptTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setCacheMode(REPLICATED); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testInterruptPessimisticTx() throws Exception { + final int NODES = 3; + + startGrids(NODES); + + awaitPartitionMapExchange(); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + Ignite node = ignite(0); + + IgniteCache<Integer, Integer> cache = node.cache(DEFAULT_CACHE_NAME); + + final int KEYS = 100; + + final boolean changeTop = true; + + for (int i = 0; i < 10; i++) { + info("Iteration: " + i); + + final AtomicBoolean stop = new AtomicBoolean(); + + try { + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + Ignite node = ignite(0); + + IgniteCache<Integer, Integer> cache = node.cache(DEFAULT_CACHE_NAME); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!stop.get()) { + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (int i = 0; i < KEYS; i++) { + if (rnd.nextBoolean()) + cache.get(i); + } + } + } + } + }, 3, "tx-thread"); + + IgniteInternalFuture<?> changeTopFut = null; + + if (changeTop) { + changeTopFut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + while (!stop.get()) { + startGrid(NODES); + + stopGrid(NODES); + } + + return null; + } + }); + } + + U.sleep(rnd.nextInt(500)); + + fut.cancel(); + + U.sleep(rnd.nextInt(500)); + + stop.set(true); + + try { + fut.get(); + } + catch (Exception e) { + info("Ignore error: " + e); + } + + if (changeTopFut != null) + changeTopFut.get(); + + info("Try get"); + + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (int k = 0; k < KEYS; k++) + cache.get(k); + } + + info("Try get done"); + + startGrid(NODES); + stopGrid(NODES); + } + finally { + stop.set(true); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d8e46de2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java index 7dba461..173c555 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java @@ -22,6 +22,7 @@ import org.apache.ignite.internal.processors.cache.CachePutIfAbsentTest; import org.apache.ignite.internal.processors.cache.IgniteCacheGetCustomCollectionsSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheLoadRebalanceEvictionSelfTest; import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicPrimarySyncBackPressureTest; +import org.apache.ignite.internal.processors.cache.distributed.CacheOperationsInterruptTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePrimarySyncTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCachePrimarySyncTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCacheWriteSynchronizationModesMultithreadedTest; @@ -52,6 +53,8 @@ public class IgniteCacheTestSuite9 extends TestSuite { suite.addTestSuite(TxDataConsistencyOnCommitFailureTest.class); + suite.addTestSuite(CacheOperationsInterruptTest.class); + return suite; } }