Repository: ignite
Updated Branches:
  refs/heads/master 7bf9bc281 -> 97b93b84c


IGNITE-9292: MVCC: fixed a race causing unexpected state of entry in TX log. 
This closes #4984.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/97b93b84
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/97b93b84
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/97b93b84

Branch: refs/heads/master
Commit: 97b93b84c24c1131c31fec32f37f67490c870ad9
Parents: 7bf9bc2
Author: Igor Seliverstov <gvvinbl...@gmail.com>
Authored: Tue Oct 16 15:09:01 2018 +0300
Committer: devozerov <voze...@gridgain.com>
Committed: Tue Oct 16 15:09:01 2018 +0300

----------------------------------------------------------------------
 .../GridDistributedTxRemoteAdapter.java         | 20 +++++---
 .../distributed/dht/GridDhtTxFinishFuture.java  | 19 +++++++-
 .../cache/distributed/near/GridNearTxLocal.java | 23 +++++++--
 .../cache/mvcc/MvccProcessorImpl.java           |  8 +--
 .../cache/transactions/IgniteTxAdapter.java     | 51 +++-----------------
 .../cache/transactions/IgniteTxManager.java     | 27 +++++++++++
 .../mvcc/CacheMvccSqlTxQueriesAbstractTest.java | 24 +++++++++
 7 files changed, 111 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/97b93b84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index bd13fc3..7313197 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -850,6 +850,8 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
 
                 cctx.tm().commitTx(this);
 
+                cctx.tm().mvccFinish(this, true);
+
                 state(COMMITTED);
             }
         }
@@ -946,18 +948,20 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
 
                 state(ROLLED_BACK);
 
-                try {
-                    cctx.mvccCaching().onTxFinished(this, false);
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IgniteException(e);
-                }
+                cctx.mvccCaching().onTxFinished(this, false);
+
+                cctx.tm().mvccFinish(this, false);
             }
         }
-        catch (RuntimeException | Error e) {
+        catch (IgniteCheckedException | RuntimeException | Error e) {
             state(UNKNOWN);
 
-            throw e;
+            if (e instanceof IgniteCheckedException)
+                throw new IgniteException(e);
+            else if (e instanceof RuntimeException)
+                throw (RuntimeException) e;
+            else
+                throw (Error) e;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/97b93b84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 5c8999d..21eb7b2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -245,7 +245,7 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
             if (commit && e == null)
                 e = this.tx.commitError();
 
-            Throwable finishErr = e != null ? e : err;
+            Throwable finishErr = mvccFinish(e != null ? e : err);
 
             if (super.onDone(tx, finishErr)) {
                 if (finishErr == null)
@@ -595,6 +595,23 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
         return res;
     }
 
+    /**
+     * Finishes MVCC transaction on the local node.
+     */
+    private Throwable mvccFinish(Throwable commitError) {
+        try {
+            cctx.tm().mvccFinish(tx, commit && commitError == null);
+        }
+        catch (IgniteCheckedException ex) {
+            if (commitError == null)
+                tx.commitError(commitError = ex);
+            else
+                commitError.addSuppressed(ex);
+        }
+
+        return commitError;
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext 
ctx) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/97b93b84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 46698fe..68aa5c0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -91,6 +91,7 @@ import org.apache.ignite.internal.util.typedef.C2;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.CX1;
+import org.apache.ignite.internal.util.typedef.CX2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -4075,10 +4076,26 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements GridTimeou
 
         // Do not create finish future if there are no remote nodes.
         if (F.isEmpty(dhtMap) && F.isEmpty(nearMap)) {
-            if (prep != null)
-                return (IgniteInternalFuture<IgniteInternalTx>)prep;
+            if (prep != null) {
+                return new GridEmbeddedFuture<>(new CX2<IgniteInternalTx, 
Exception, IgniteInternalTx>() {
+                    @Override public IgniteInternalTx applyx(IgniteInternalTx 
o, Exception e) throws IgniteCheckedException {
+                        cctx.tm().mvccFinish(GridNearTxLocal.this, e == null);
 
-            return new GridFinishedFuture<IgniteInternalTx>(this);
+                        return o;
+                    }
+                }, (IgniteInternalFuture<IgniteInternalTx>)prep);
+            }
+
+            try {
+                cctx.tm().mvccFinish(this, true);
+
+                return new GridFinishedFuture<>(this);
+            }
+            catch (IgniteCheckedException e) {
+                commitError(e);
+
+                return new GridFinishedFuture<>(e);
+            }
         }
 
         final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, 
this, true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/97b93b84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
index e377b0d..d22c61d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
@@ -431,15 +431,15 @@ public class MvccProcessorImpl extends 
GridProcessorAdapter implements MvccProce
     }
 
     /** {@inheritDoc} */
-    @Override public byte state(long crdVer, long cntr) throws 
IgniteCheckedException {
-        return txLog.get(crdVer, cntr);
+    @Override public byte state(MvccVersion ver) throws IgniteCheckedException 
{
+        return state(ver.coordinatorVersion(), ver.counter());
     }
 
     /** {@inheritDoc} */
-    @Override public byte state(MvccVersion ver) throws IgniteCheckedException 
{
+    @Override public byte state(long crdVer, long cntr) throws 
IgniteCheckedException {
         assert txLog != null && mvccEnabled;
 
-        return txLog.get(ver.coordinatorVersion(), ver.counter());
+        return txLog.get(crdVer, cntr);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/97b93b84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 4dbc354..b091061 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1157,55 +1157,16 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
                     seal();
 
                 if (state == PREPARED || state == COMMITTED || state == 
ROLLED_BACK) {
-                    if (mvccSnapshot != null) {
-                        byte txState;
-
-                        switch (state) {
-                            case PREPARED:
-                                txState = TxState.PREPARED;
-                                break;
-                            case ROLLED_BACK:
-                                txState = TxState.ABORTED;
-                                break;
-                            case COMMITTED:
-                                txState = TxState.COMMITTED;
-                                break;
-                            default:
-                                throw new IllegalStateException("Illegal 
state: " + state);
-                        }
-
+                    if (state == PREPARED) {
                         try {
-                            if (!cctx.localNode().isClient()) {
-                                if (dht() && remote())
-                                    
cctx.coordinators().updateState(mvccSnapshot, txState, false);
-                                else if (local()) {
-                                    IgniteInternalFuture<?> rollbackFut = 
rollbackFuture();
-
-                                    boolean syncUpdate = txState == 
TxState.PREPARED || txState == TxState.COMMITTED ||
-                                        rollbackFut == null || 
rollbackFut.isDone();
-
-                                    if (syncUpdate)
-                                        
cctx.coordinators().updateState(mvccSnapshot, txState);
-                                    else {
-                                        // If tx was aborted, we need to wait 
tx log is updated on all backups.
-                                        rollbackFut.listen(new 
IgniteInClosure<IgniteInternalFuture>() {
-                                            @Override public void 
apply(IgniteInternalFuture fut) {
-                                                try {
-                                                    
cctx.coordinators().updateState(mvccSnapshot, txState);
-                                                }
-                                                catch (IgniteCheckedException 
e) {
-                                                    U.error(log, "Failed to 
log TxState: " + txState, e);
-                                                }
-                                            }
-                                        });
-                                    }
-                                }
-                            }
+                            cctx.tm().mvccPrepare(this);
                         }
                         catch (IgniteCheckedException e) {
-                            U.error(log, "Failed to log TxState: " + txState, 
e);
+                            String msg = "Failed to update TxState: " + 
TxState.PREPARED;
+
+                            U.error(log, msg, e);
 
-                            throw new IgniteException("Failed to log TxState: 
" + txState, e);
+                            throw new IgniteException(msg, e);
                         }
                     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/97b93b84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 438c8ab..27b1522 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -59,6 +59,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVe
 import 
org.apache.ignite.internal.processors.cache.distributed.GridCacheTxFinishSync;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
@@ -69,6 +70,9 @@ import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearCach
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearOptimisticTxPrepareFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
 import 
org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetection.TxDeadlockFuture;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
@@ -2397,6 +2401,29 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     }
 
     /**
+     * Marks MVCC transaction as {@link TxState#COMMITTED} or {@link 
TxState#ABORTED}.
+     *
+     * @param tx Transaction.
+     * @param commit Commit flag.
+     * @throws IgniteCheckedException If failed to add version to TxLog.
+     */
+    public void mvccFinish(IgniteTxAdapter tx, boolean commit) throws 
IgniteCheckedException {
+        if (!cctx.kernalContext().clientNode() && tx.mvccSnapshot != null && 
!(tx.near() && tx.remote()))
+            cctx.coordinators().updateState(tx.mvccSnapshot, commit ? 
TxState.COMMITTED : TxState.ABORTED, tx.local());
+    }
+
+    /**
+     * Marks MVCC transaction as {@link TxState#PREPARED}.
+     *
+     * @param tx Transaction.
+     * @throws IgniteCheckedException If failed to add version to TxLog.
+     */
+    public void mvccPrepare(IgniteTxAdapter tx) throws IgniteCheckedException {
+        if (!cctx.kernalContext().clientNode() && tx.mvccSnapshot != null && 
!(tx.near() && tx.remote()))
+            cctx.coordinators().updateState(tx.mvccSnapshot, TxState.PREPARED);
+    }
+
+    /**
      * Timeout object for node failure handler.
      */
     private final class NodeFailureTimeoutObject extends 
GridTimeoutObjectAdapter {

http://git-wip-us.apache.org/repos/asf/ignite/blob/97b93b84/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
index 4abcaa1..053d370 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
@@ -1652,6 +1652,30 @@ public abstract class CacheMvccSqlTxQueriesAbstractTest 
extends CacheMvccAbstrac
     /**
      * @throws Exception If failed.
      */
+    public void testFastInsertUpdateConcurrent() throws Exception {
+        ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, 
DFLT_PARTITION_COUNT)
+            .setIndexedTypes(Integer.class, Integer.class);
+
+        Ignite ignite = startGridsMultiThreaded(4);
+
+        IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < 1000; i++) {
+            int key = i;
+            CompletableFuture.allOf(
+                CompletableFuture.runAsync(() -> {
+                    cache.query(new SqlFieldsQuery("insert into Integer(_key, 
_val) values(?, ?)").setArgs(key, key));
+                }),
+                CompletableFuture.runAsync(() -> {
+                    cache.query(new SqlFieldsQuery("update Integer set _val = 
? where _key = ?").setArgs(key, key));
+                })
+            ).join();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testIterator() throws Exception {
         ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, 
DFLT_PARTITION_COUNT)
             .setIndexedTypes(Integer.class, Integer.class);

Reply via email to