IGNITE-6181 wip.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0e2ff6a8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0e2ff6a8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0e2ff6a8

Branch: refs/heads/ignite-6181-1
Commit: 0e2ff6a89a223991f5ce35d9656301f2004973e4
Parents: 047ac12
Author: Aleksei Scherbakov <alexey.scherbak...@gmail.com>
Authored: Thu Sep 14 17:12:08 2017 +0300
Committer: Aleksei Scherbakov <alexey.scherbak...@gmail.com>
Committed: Thu Sep 14 17:12:08 2017 +0300

----------------------------------------------------------------------
 .../colocated/GridDhtColocatedLockFuture.java   |  36 +++----
 .../cache/distributed/near/GridNearTxLocal.java |  39 ++++++-
 .../transactions/TxRollbackOnTimeoutTest.java   | 107 ++++---------------
 3 files changed, 74 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0e2ff6a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 7500549..82b0e6e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -147,7 +147,7 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
     private final Map<KeyCacheObject, IgniteBiTuple<GridCacheVersion, 
CacheObject>> valMap;
 
     /** Trackable flag (here may be non-volatile). */
-    private boolean trackable;
+    private boolean trackable = true;
 
     /** TTL for create operation. */
     private final long createTtl;
@@ -631,6 +631,13 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
         }
     }
 
+    /**
+     * @return Timeout.
+     */
+    public long timeout() {
+        return timeout;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         Collection<String> futs = F.viewReadOnly(futures(), new 
C1<IgniteInternalFuture<?>, String>() {
@@ -849,8 +856,12 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
         assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks()));
 
         // First assume this node is primary for all keys passed in.
-        if (!clientNode && mapAsPrimary(keys, topVer))
+        if (!clientNode && mapAsPrimary(keys, topVer)) {
+            if (!cctx.mvcc().addFuture(this))
+                throw new IllegalStateException("Duplicate future ID: " + 
this);
+
             return;
+        }
 
         mappings = new ArrayDeque<>();
 
@@ -881,8 +892,6 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
         if (log.isDebugEnabled())
             log.debug("Starting (re)map for mappings [mappings=" + mappings + 
", fut=" + this + ']');
 
-        boolean hasRmtNodes = false;
-
         boolean first = true;
 
         // Create mini futures.
@@ -1029,11 +1038,8 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
                 }
             }
 
-            if (!distributedKeys.isEmpty()) {
+            if (!distributedKeys.isEmpty())
                 mapping.distributedKeys(distributedKeys);
-
-                hasRmtNodes |= !mapping.node().isLocal();
-            }
             else {
                 assert mapping.request() == null;
 
@@ -1041,14 +1047,8 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
             }
         }
 
-        if (hasRmtNodes) {
-            trackable = true;
-
-            if (!remap && !cctx.mvcc().addFuture(this))
-                throw new IllegalStateException("Duplicate future ID: " + 
this);
-        }
-        else
-            trackable = false;
+        if (!remap && !cctx.mvcc().addFuture(this))
+            throw new IllegalStateException("Duplicate future ID: " + this);
 
         proceedMapping();
     }
@@ -1264,8 +1264,6 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
                 return true;
         }
 
-        trackable = false;
-
         if (tx != null) {
             if (explicit)
                 tx.markExplicit(cctx.localNodeId());
@@ -1673,4 +1671,4 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
             return S.toString(MiniFuture.class, this, "node", node.id(), 
"super", super.toString());
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/0e2ff6a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 0a59cd0..ee3d1d9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -46,6 +46,7 @@ import 
org.apache.ignite.internal.processors.cache.EntryGetResult;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
@@ -56,11 +57,14 @@ import 
org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedLockFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.local.GridLocalLockFuture;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -90,6 +94,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
 import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.transactions.TransactionConcurrency;
@@ -135,6 +140,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter 
implements GridTimeou
     private static final AtomicReferenceFieldUpdater<GridNearTxLocal, 
GridNearTxFinishFuture> ROLLBACK_FUT_UPD =
         AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, 
GridNearTxFinishFuture.class, "rollbackFut");
 
+    /** Lock future predicate. */
+    private static final IgnitePredicate<GridCacheFuture<?>> LOCK_FUTURES = 
new IgnitePredicate<GridCacheFuture<?>>() {
+        @Override public boolean apply(GridCacheFuture<?> fut) {
+            return fut instanceof GridDhtColocatedLockFuture;
+        }
+    };
+
     /** DHT mappings. */
     private IgniteTxMappings mappings;
 
@@ -4025,21 +4037,38 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements GridTimeou
 
     /** {@inheritDoc} */
     @Override public long endTime() {
-        return startTime() + timeout() - 50;
+        return startTime() + timeout() - 150;
     }
 
     /** {@inheritDoc} */
     @Override public void onTimeout() {
-        if (state(MARKED_ROLLBACK, true)) {
+        //if (state(MARKED_ROLLBACK, true)) {
             cctx.kernalContext().closure().runLocalSafe(new Runnable() {
                 @Override public void run() {
-                    log().error("Transaction is timed out and will be rolled 
back [timeout=" + timeout() +
+                    // Wait for active local lock futures completion to 
prevent races with deadlock detection.
+                    Collection<GridCacheFuture<?>> lockFuts = 
F.view(cctx.mvcc().activeFutures(), LOCK_FUTURES);
+
+                    for (GridCacheFuture<?> fut : lockFuts) {
+                        try {
+                            GridDhtColocatedLockFuture locFut = 
(GridDhtColocatedLockFuture)fut;
+
+                            if (locFut.timeout() > 0)
+                                fut.get();
+                        }
+                        catch (IgniteCheckedException e) {
+                            log.error("Failed to wait for lock future 
completion [fut=" + fut + ']', e);
+                        }
+                    }
+
+                    if (state(MARKED_ROLLBACK, true)) {
+                        log().error("Transaction is timed out and will be 
rolled back [timeout=" + timeout() +
                             ", tx=" + GridNearTxLocal.this + ']');
 
-                    rollbackNearTxLocalAsync();
+                        rollbackNearTxLocalAsync();
+                    }
                 }
             });
-        }
+        //}
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/0e2ff6a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
index d22b682..025332ae 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
+import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
@@ -45,7 +46,6 @@ import org.jsr166.ThreadLocalRandom8;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS;
 
 /**
  * Tests an ability to eagerly rollback timed out transactions.
@@ -98,20 +98,6 @@ public class TxRollbackOnTimeoutTest extends 
GridCommonAbstractTest {
     }
 
     /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
-        //System.setProperty(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, "0");
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        super.afterTestsStopped();
-
-        //System.clearProperty(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS);
-    }
-
-    /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 
@@ -228,23 +214,31 @@ public class TxRollbackOnTimeoutTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     * Tests if deadlock is unblocked on timeout.
-     * @throws Exception
+     * Tests if deadlock is resolved on timeout with correct message.
+     *
+     * @throws Exception If failed.
      */
     public void testDeadlockUnblockedOnTimeout() throws Exception {
+        testDeadlockUnblockedOnTimeout0(ignite(0), ignite(1));
+    }
+
+    /**
+     * Tests if deadlock is resolved on timeout with correct message.
+     * @throws Exception
+     */
+    private void testDeadlockUnblockedOnTimeout0(final Ignite node1, final 
Ignite node2) throws Exception {
         final CountDownLatch l = new CountDownLatch(2);
 
         IgniteInternalFuture<?> fut1 = multithreadedAsync(new Runnable() {
-            @Override
-            public void run() {
+            @Override public void run() {
                 try {
-                    try (Transaction tx = ignite(0).transactions().txStart()) {
-                        ignite(0).cache(CACHE_NAME).put(1, 1);
+                    try (Transaction tx = 
ignite(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ, TX_TIMEOUT, 2)) {
+                        node1.cache(CACHE_NAME).put(1, 1);
 
                         l.countDown();
                         U.awaitQuiet(l);
 
-                        ignite(0).cache(CACHE_NAME).put(2, 2);
+                        
node1.cache(CACHE_NAME).putAll(Collections.singletonMap(2, 2));
 
                         tx.commit();
 
@@ -252,21 +246,20 @@ public class TxRollbackOnTimeoutTest extends 
GridCommonAbstractTest {
                     }
                 } catch (CacheException e) {
                     // No-op.
+                    assertTrue(X.hasCause(e, 
TransactionDeadlockException.class));
                 }
             }
         }, 1, "First");
 
         IgniteInternalFuture<?> fut2 = multithreadedAsync(new Runnable() {
             @Override public void run() {
-                U.awaitQuiet(blocked);
-
-                try (Transaction tx = 
ignite(1).transactions().txStart(PESSIMISTIC, REPEATABLE_READ, TX_TIMEOUT, 1)) {
-                    ignite(1).cache(CACHE_NAME).put(2, 2);
+                try (Transaction tx = 
ignite(1).transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 0, 2)) {
+                    node2.cache(CACHE_NAME).put(2, 2);
 
                     l.countDown();
                     U.awaitQuiet(l);
 
-                    ignite(1).cache(CACHE_NAME).put(1, 1);
+                    node2.cache(CACHE_NAME).put(1, 1);
 
                     tx.commit();
                 }
@@ -276,8 +269,8 @@ public class TxRollbackOnTimeoutTest extends 
GridCommonAbstractTest {
         fut1.get();
         fut2.get();
 
-        assertTrue(ignite(0).cache(CACHE_NAME).containsKey(1));
-        assertTrue(ignite(0).cache(CACHE_NAME).containsKey(2));
+        assertTrue(node1.cache(CACHE_NAME).containsKey(1));
+        assertTrue(node1.cache(CACHE_NAME).containsKey(2));
     }
 
     /**
@@ -319,60 +312,6 @@ public class TxRollbackOnTimeoutTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     * Tests tx timeout on deadlock.
-     *
-     * @throws Exception
-     */
-    public void testTimeoutOnDeadlock() throws Exception {
-        final CountDownLatch l = new CountDownLatch(2);
-
-        IgniteInternalFuture<?> fut1 = multithreadedAsync(new Runnable() {
-            @Override public void run() {
-                try(Transaction tx = 
grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ, TX_TIMEOUT, 1)) {
-                    grid(0).cache(CACHE_NAME).put(1, 1);
-
-                    l.countDown();
-                    U.awaitQuiet(l);
-
-                    grid(0).cache(CACHE_NAME).put(2, 2);
-                }
-
-            }
-        }, 1, "First");
-
-        IgniteInternalFuture<?> fut2 = multithreadedAsync(new Runnable() {
-            @Override public void run() {
-                try(Transaction tx = 
grid(1).transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 0, 1)) {
-                    grid(1).cache(CACHE_NAME).put(2, 2);
-
-                    l.countDown();
-                    U.awaitQuiet(l);
-
-                    grid(1).cache(CACHE_NAME).put(1, 1);
-
-                    tx.commit();
-                }
-
-            }
-
-        }, 1, "Second");
-
-        try {
-            fut1.get();
-
-            fail();
-        }
-        catch (Exception e) {
-            assertTrue(X.hasCause(e, TransactionDeadlockException.class));
-        }
-
-        fut2.get();
-
-        assertTrue(grid(0).cache(CACHE_NAME).containsKey(1));
-        assertTrue(grid(0).cache(CACHE_NAME).containsKey(2));
-    }
-
-    /**
      * @param concurrency Concurrency.
      * @param isolation Isolation.
      *
@@ -454,7 +393,7 @@ public class TxRollbackOnTimeoutTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     private void testWaitingTxUnblockedOnTimeout0(final Ignite near, final 
Ignite other) throws Exception {
-        final int recordsCnt = 100;
+        final int recordsCnt = 1;
 
         IgniteInternalFuture<?> fut1 = multithreadedAsync(new Runnable() {
             @Override public void run() {

Reply via email to