IGNITE-6181 wip.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0e2ff6a8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0e2ff6a8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0e2ff6a8 Branch: refs/heads/ignite-6181-1 Commit: 0e2ff6a89a223991f5ce35d9656301f2004973e4 Parents: 047ac12 Author: Aleksei Scherbakov <alexey.scherbak...@gmail.com> Authored: Thu Sep 14 17:12:08 2017 +0300 Committer: Aleksei Scherbakov <alexey.scherbak...@gmail.com> Committed: Thu Sep 14 17:12:08 2017 +0300 ---------------------------------------------------------------------- .../colocated/GridDhtColocatedLockFuture.java | 36 +++---- .../cache/distributed/near/GridNearTxLocal.java | 39 ++++++- .../transactions/TxRollbackOnTimeoutTest.java | 107 ++++--------------- 3 files changed, 74 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0e2ff6a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 7500549..82b0e6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -147,7 +147,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF private final Map<KeyCacheObject, IgniteBiTuple<GridCacheVersion, CacheObject>> valMap; /** Trackable flag (here may be non-volatile). */ - private boolean trackable; + private boolean trackable = true; /** TTL for create operation. */ private final long createTtl; @@ -631,6 +631,13 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF } } + /** + * @return Timeout. + */ + public long timeout() { + return timeout; + } + /** {@inheritDoc} */ @Override public String toString() { Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { @@ -849,8 +856,12 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks())); // First assume this node is primary for all keys passed in. - if (!clientNode && mapAsPrimary(keys, topVer)) + if (!clientNode && mapAsPrimary(keys, topVer)) { + if (!cctx.mvcc().addFuture(this)) + throw new IllegalStateException("Duplicate future ID: " + this); + return; + } mappings = new ArrayDeque<>(); @@ -881,8 +892,6 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF if (log.isDebugEnabled()) log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']'); - boolean hasRmtNodes = false; - boolean first = true; // Create mini futures. @@ -1029,11 +1038,8 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF } } - if (!distributedKeys.isEmpty()) { + if (!distributedKeys.isEmpty()) mapping.distributedKeys(distributedKeys); - - hasRmtNodes |= !mapping.node().isLocal(); - } else { assert mapping.request() == null; @@ -1041,14 +1047,8 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF } } - if (hasRmtNodes) { - trackable = true; - - if (!remap && !cctx.mvcc().addFuture(this)) - throw new IllegalStateException("Duplicate future ID: " + this); - } - else - trackable = false; + if (!remap && !cctx.mvcc().addFuture(this)) + throw new IllegalStateException("Duplicate future ID: " + this); proceedMapping(); } @@ -1264,8 +1264,6 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF return true; } - trackable = false; - if (tx != null) { if (explicit) tx.markExplicit(cctx.localNodeId()); @@ -1673,4 +1671,4 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF return S.toString(MiniFuture.class, this, "node", node.id(), "super", super.toString()); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/0e2ff6a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 0a59cd0..ee3d1d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.EntryGetResult; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture; import org.apache.ignite.internal.processors.cache.GridCacheOperation; @@ -56,11 +57,14 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedLockFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; +import org.apache.ignite.internal.processors.cache.local.GridLocalLockFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; @@ -90,6 +94,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiClosure; import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.transactions.TransactionConcurrency; @@ -135,6 +140,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou private static final AtomicReferenceFieldUpdater<GridNearTxLocal, GridNearTxFinishFuture> ROLLBACK_FUT_UPD = AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, GridNearTxFinishFuture.class, "rollbackFut"); + /** Lock future predicate. */ + private static final IgnitePredicate<GridCacheFuture<?>> LOCK_FUTURES = new IgnitePredicate<GridCacheFuture<?>>() { + @Override public boolean apply(GridCacheFuture<?> fut) { + return fut instanceof GridDhtColocatedLockFuture; + } + }; + /** DHT mappings. */ private IgniteTxMappings mappings; @@ -4025,21 +4037,38 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou /** {@inheritDoc} */ @Override public long endTime() { - return startTime() + timeout() - 50; + return startTime() + timeout() - 150; } /** {@inheritDoc} */ @Override public void onTimeout() { - if (state(MARKED_ROLLBACK, true)) { + //if (state(MARKED_ROLLBACK, true)) { cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { - log().error("Transaction is timed out and will be rolled back [timeout=" + timeout() + + // Wait for active local lock futures completion to prevent races with deadlock detection. + Collection<GridCacheFuture<?>> lockFuts = F.view(cctx.mvcc().activeFutures(), LOCK_FUTURES); + + for (GridCacheFuture<?> fut : lockFuts) { + try { + GridDhtColocatedLockFuture locFut = (GridDhtColocatedLockFuture)fut; + + if (locFut.timeout() > 0) + fut.get(); + } + catch (IgniteCheckedException e) { + log.error("Failed to wait for lock future completion [fut=" + fut + ']', e); + } + } + + if (state(MARKED_ROLLBACK, true)) { + log().error("Transaction is timed out and will be rolled back [timeout=" + timeout() + ", tx=" + GridNearTxLocal.this + ']'); - rollbackNearTxLocalAsync(); + rollbackNearTxLocalAsync(); + } } }); - } + //} } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0e2ff6a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java index d22b682..025332ae 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.transactions; +import java.util.Collections; import java.util.concurrent.CountDownLatch; import javax.cache.CacheException; import org.apache.ignite.Ignite; @@ -45,7 +46,6 @@ import org.jsr166.ThreadLocalRandom8; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS; /** * Tests an ability to eagerly rollback timed out transactions. @@ -98,20 +98,6 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - //System.setProperty(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, "0"); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - super.afterTestsStopped(); - - //System.clearProperty(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS); - } - - /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); @@ -228,23 +214,31 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { } /** - * Tests if deadlock is unblocked on timeout. - * @throws Exception + * Tests if deadlock is resolved on timeout with correct message. + * + * @throws Exception If failed. */ public void testDeadlockUnblockedOnTimeout() throws Exception { + testDeadlockUnblockedOnTimeout0(ignite(0), ignite(1)); + } + + /** + * Tests if deadlock is resolved on timeout with correct message. + * @throws Exception + */ + private void testDeadlockUnblockedOnTimeout0(final Ignite node1, final Ignite node2) throws Exception { final CountDownLatch l = new CountDownLatch(2); IgniteInternalFuture<?> fut1 = multithreadedAsync(new Runnable() { - @Override - public void run() { + @Override public void run() { try { - try (Transaction tx = ignite(0).transactions().txStart()) { - ignite(0).cache(CACHE_NAME).put(1, 1); + try (Transaction tx = ignite(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ, TX_TIMEOUT, 2)) { + node1.cache(CACHE_NAME).put(1, 1); l.countDown(); U.awaitQuiet(l); - ignite(0).cache(CACHE_NAME).put(2, 2); + node1.cache(CACHE_NAME).putAll(Collections.singletonMap(2, 2)); tx.commit(); @@ -252,21 +246,20 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { } } catch (CacheException e) { // No-op. + assertTrue(X.hasCause(e, TransactionDeadlockException.class)); } } }, 1, "First"); IgniteInternalFuture<?> fut2 = multithreadedAsync(new Runnable() { @Override public void run() { - U.awaitQuiet(blocked); - - try (Transaction tx = ignite(1).transactions().txStart(PESSIMISTIC, REPEATABLE_READ, TX_TIMEOUT, 1)) { - ignite(1).cache(CACHE_NAME).put(2, 2); + try (Transaction tx = ignite(1).transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 0, 2)) { + node2.cache(CACHE_NAME).put(2, 2); l.countDown(); U.awaitQuiet(l); - ignite(1).cache(CACHE_NAME).put(1, 1); + node2.cache(CACHE_NAME).put(1, 1); tx.commit(); } @@ -276,8 +269,8 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { fut1.get(); fut2.get(); - assertTrue(ignite(0).cache(CACHE_NAME).containsKey(1)); - assertTrue(ignite(0).cache(CACHE_NAME).containsKey(2)); + assertTrue(node1.cache(CACHE_NAME).containsKey(1)); + assertTrue(node1.cache(CACHE_NAME).containsKey(2)); } /** @@ -319,60 +312,6 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { } /** - * Tests tx timeout on deadlock. - * - * @throws Exception - */ - public void testTimeoutOnDeadlock() throws Exception { - final CountDownLatch l = new CountDownLatch(2); - - IgniteInternalFuture<?> fut1 = multithreadedAsync(new Runnable() { - @Override public void run() { - try(Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ, TX_TIMEOUT, 1)) { - grid(0).cache(CACHE_NAME).put(1, 1); - - l.countDown(); - U.awaitQuiet(l); - - grid(0).cache(CACHE_NAME).put(2, 2); - } - - } - }, 1, "First"); - - IgniteInternalFuture<?> fut2 = multithreadedAsync(new Runnable() { - @Override public void run() { - try(Transaction tx = grid(1).transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 0, 1)) { - grid(1).cache(CACHE_NAME).put(2, 2); - - l.countDown(); - U.awaitQuiet(l); - - grid(1).cache(CACHE_NAME).put(1, 1); - - tx.commit(); - } - - } - - }, 1, "Second"); - - try { - fut1.get(); - - fail(); - } - catch (Exception e) { - assertTrue(X.hasCause(e, TransactionDeadlockException.class)); - } - - fut2.get(); - - assertTrue(grid(0).cache(CACHE_NAME).containsKey(1)); - assertTrue(grid(0).cache(CACHE_NAME).containsKey(2)); - } - - /** * @param concurrency Concurrency. * @param isolation Isolation. * @@ -454,7 +393,7 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { * @throws Exception If failed. */ private void testWaitingTxUnblockedOnTimeout0(final Ignite near, final Ignite other) throws Exception { - final int recordsCnt = 100; + final int recordsCnt = 1; IgniteInternalFuture<?> fut1 = multithreadedAsync(new Runnable() { @Override public void run() {