ignite-4429 Deadlock in IgniteAtomicSequence when used inside transaction (cherry picked from commit 9905ff3)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/607f4eb7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/607f4eb7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/607f4eb7 Branch: refs/heads/ignite-1.9 Commit: 607f4eb7cea1e7392fac584772b3b86851eaf57c Parents: f505e20 Author: Aleksei Scherbakov <[email protected]> Authored: Thu Dec 15 12:31:56 2016 +0300 Committer: sboikov <[email protected]> Committed: Wed Feb 1 17:33:03 2017 +0300 ---------------------------------------------------------------------- .../GridCacheAtomicSequenceImpl.java | 12 +- ...achePartitionedAtomicSequenceTxSelfTest.java | 169 +++++++++++++++++++ .../IgniteCacheDataStructuresSelfTestSuite.java | 2 + 3 files changed, 181 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/607f4eb7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java index 4f660b6..754d8f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java @@ -29,6 +29,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; @@ -253,8 +254,15 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc while (true) { if (updateGuard.compareAndSet(false, true)) { try { - // This call must be outside lock. - return CU.outTx(updateCall, ctx); + try { + return updateCall.call(); + } + catch (IgniteCheckedException | IgniteException | IllegalStateException e) { + throw e; + } + catch (Exception e) { + throw new IgniteCheckedException(e); + } } finally { lock.lock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/607f4eb7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceTxSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceTxSelfTest.java new file mode 100644 index 0000000..d04a68a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceTxSelfTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.datastructures.partitioned; + +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteAtomicSequence; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.AtomicConfiguration; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; + +/** + * Tests {@link IgniteAtomicSequence} operations inside started user transaction. + */ +public class GridCachePartitionedAtomicSequenceTxSelfTest extends GridCommonAbstractTest { + /** Number of threads. */ + private static final int THREAD_NUM = 8; + + /** Sequence cache size. */ + private static final int SEQ_CACHE_SIZE = 10; + + /** Iterations. */ + private static final int ITERATIONS = 100; + + /** Sequence name. */ + private static final String SEQ_NAME = "seq"; + + /** Latch. */ + private static CountDownLatch latch; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + cfg.setPublicThreadPoolSize(THREAD_NUM); + + AtomicConfiguration atomicCfg = atomicConfiguration(); + + assertNotNull(atomicCfg); + + cfg.setAtomicConfiguration(atomicCfg); + + return cfg; + } + + /** + * @return Atomic config for test. + */ + protected AtomicConfiguration atomicConfiguration() { + AtomicConfiguration cfg = new AtomicConfiguration(); + + cfg.setBackups(1); + cfg.setAtomicSequenceReserveSize(SEQ_CACHE_SIZE); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + latch = new CountDownLatch(THREAD_NUM); + + startGridsMultiThreaded(1); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * Tests sequence calls inside transactions. + * + * @throws Exception If failed. + */ + public void testTransactionIncrement() throws Exception { + ignite(0).atomicSequence(SEQ_NAME, 0, true); + + for (int i = 0; i < THREAD_NUM; i++) { + multithreaded(new Runnable() { + @Override public void run() { + ignite(0).compute().run(new IncrementClosure()); + + } + }, THREAD_NUM); + } + } + + /** + * Tests isolation of system and user transactions. + */ + public void testIsolation() { + IgniteAtomicSequence seq = ignite(0).atomicSequence(SEQ_NAME, 0, true); + + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + ccfg.setAtomicityMode(TRANSACTIONAL); + + IgniteCache<Object, Object> cache = ignite(0).getOrCreateCache(ccfg); + + try (Transaction tx = ignite(0).transactions().txStart()) { + seq.getAndIncrement(); + + cache.put(1, 1); + + tx.rollback(); + } + + assertEquals(0, cache.size()); + assertEquals(new Long(1L), U.field(seq, "locVal")); + assertEquals(new Long(SEQ_CACHE_SIZE - 1), U.field(seq, "upBound")); + } + + /** + * Closure which does sequence increment. + */ + private static class IncrementClosure implements IgniteRunnable { + /** Ignite instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public void run() { + IgniteAtomicSequence seq = ignite.atomicSequence(SEQ_NAME, 0, false); + + latch.countDown(); + + U.awaitQuiet(latch); + + for (int i = 0; i < ITERATIONS; i++) + try (Transaction ignored = ignite.transactions().txStart()) { + seq.incrementAndGet(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/607f4eb7/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java index 45a49bf..267128b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.datastructures.partitioned.Gr import org.apache.ignite.internal.processors.cache.datastructures.partitioned.GridCachePartitionedAtomicQueueRotativeMultiNodeTest; import org.apache.ignite.internal.processors.cache.datastructures.partitioned.GridCachePartitionedAtomicReferenceApiSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.partitioned.GridCachePartitionedAtomicSequenceMultiThreadedTest; +import org.apache.ignite.internal.processors.cache.datastructures.partitioned.GridCachePartitionedAtomicSequenceTxSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.partitioned.GridCachePartitionedAtomicSetFailoverSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.partitioned.GridCachePartitionedAtomicSetSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.partitioned.GridCachePartitionedAtomicStampedApiSelfTest; @@ -168,6 +169,7 @@ public class IgniteCacheDataStructuresSelfTestSuite extends TestSuite { suite.addTest(new TestSuite(IgniteReplicatedAtomicLongApiSelfTest.class)); suite.addTest(new TestSuite(GridCachePartitionedAtomicSequenceMultiThreadedTest.class)); + suite.addTest(new TestSuite(GridCachePartitionedAtomicSequenceTxSelfTest.class)); suite.addTest(new TestSuite(GridCachePartitionedAtomicStampedApiSelfTest.class)); suite.addTest(new TestSuite(GridCacheReplicatedAtomicStampedApiSelfTest.class));
