Repository: ignite Updated Branches: refs/heads/master 7bf9bc281 -> 97b93b84c
IGNITE-9292: MVCC: fixed a race causing unexpected state of entry in TX log. This closes #4984. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/97b93b84 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/97b93b84 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/97b93b84 Branch: refs/heads/master Commit: 97b93b84c24c1131c31fec32f37f67490c870ad9 Parents: 7bf9bc2 Author: Igor Seliverstov <gvvinbl...@gmail.com> Authored: Tue Oct 16 15:09:01 2018 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Tue Oct 16 15:09:01 2018 +0300 ---------------------------------------------------------------------- .../GridDistributedTxRemoteAdapter.java | 20 +++++--- .../distributed/dht/GridDhtTxFinishFuture.java | 19 +++++++- .../cache/distributed/near/GridNearTxLocal.java | 23 +++++++-- .../cache/mvcc/MvccProcessorImpl.java | 8 +-- .../cache/transactions/IgniteTxAdapter.java | 51 +++----------------- .../cache/transactions/IgniteTxManager.java | 27 +++++++++++ .../mvcc/CacheMvccSqlTxQueriesAbstractTest.java | 24 +++++++++ 7 files changed, 111 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/97b93b84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index bd13fc3..7313197 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -850,6 +850,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter cctx.tm().commitTx(this); + cctx.tm().mvccFinish(this, true); + state(COMMITTED); } } @@ -946,18 +948,20 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter state(ROLLED_BACK); - try { - cctx.mvccCaching().onTxFinished(this, false); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + cctx.mvccCaching().onTxFinished(this, false); + + cctx.tm().mvccFinish(this, false); } } - catch (RuntimeException | Error e) { + catch (IgniteCheckedException | RuntimeException | Error e) { state(UNKNOWN); - throw e; + if (e instanceof IgniteCheckedException) + throw new IgniteException(e); + else if (e instanceof RuntimeException) + throw (RuntimeException) e; + else + throw (Error) e; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/97b93b84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 5c8999d..21eb7b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -245,7 +245,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity if (commit && e == null) e = this.tx.commitError(); - Throwable finishErr = e != null ? e : err; + Throwable finishErr = mvccFinish(e != null ? e : err); if (super.onDone(tx, finishErr)) { if (finishErr == null) @@ -595,6 +595,23 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity return res; } + /** + * Finishes MVCC transaction on the local node. + */ + private Throwable mvccFinish(Throwable commitError) { + try { + cctx.tm().mvccFinish(tx, commit && commitError == null); + } + catch (IgniteCheckedException ex) { + if (commitError == null) + tx.commitError(commitError = ex); + else + commitError.addSuppressed(ex); + } + + return commitError; + } + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext ctx) { http://git-wip-us.apache.org/repos/asf/ignite/blob/97b93b84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 46698fe..68aa5c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -91,6 +91,7 @@ import org.apache.ignite.internal.util.typedef.C2; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.CX1; +import org.apache.ignite.internal.util.typedef.CX2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -4075,10 +4076,26 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou // Do not create finish future if there are no remote nodes. if (F.isEmpty(dhtMap) && F.isEmpty(nearMap)) { - if (prep != null) - return (IgniteInternalFuture<IgniteInternalTx>)prep; + if (prep != null) { + return new GridEmbeddedFuture<>(new CX2<IgniteInternalTx, Exception, IgniteInternalTx>() { + @Override public IgniteInternalTx applyx(IgniteInternalTx o, Exception e) throws IgniteCheckedException { + cctx.tm().mvccFinish(GridNearTxLocal.this, e == null); - return new GridFinishedFuture<IgniteInternalTx>(this); + return o; + } + }, (IgniteInternalFuture<IgniteInternalTx>)prep); + } + + try { + cctx.tm().mvccFinish(this, true); + + return new GridFinishedFuture<>(this); + } + catch (IgniteCheckedException e) { + commitError(e); + + return new GridFinishedFuture<>(e); + } } final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, true); http://git-wip-us.apache.org/repos/asf/ignite/blob/97b93b84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java index e377b0d..d22c61d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java @@ -431,15 +431,15 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce } /** {@inheritDoc} */ - @Override public byte state(long crdVer, long cntr) throws IgniteCheckedException { - return txLog.get(crdVer, cntr); + @Override public byte state(MvccVersion ver) throws IgniteCheckedException { + return state(ver.coordinatorVersion(), ver.counter()); } /** {@inheritDoc} */ - @Override public byte state(MvccVersion ver) throws IgniteCheckedException { + @Override public byte state(long crdVer, long cntr) throws IgniteCheckedException { assert txLog != null && mvccEnabled; - return txLog.get(ver.coordinatorVersion(), ver.counter()); + return txLog.get(crdVer, cntr); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/97b93b84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 4dbc354..b091061 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1157,55 +1157,16 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement seal(); if (state == PREPARED || state == COMMITTED || state == ROLLED_BACK) { - if (mvccSnapshot != null) { - byte txState; - - switch (state) { - case PREPARED: - txState = TxState.PREPARED; - break; - case ROLLED_BACK: - txState = TxState.ABORTED; - break; - case COMMITTED: - txState = TxState.COMMITTED; - break; - default: - throw new IllegalStateException("Illegal state: " + state); - } - + if (state == PREPARED) { try { - if (!cctx.localNode().isClient()) { - if (dht() && remote()) - cctx.coordinators().updateState(mvccSnapshot, txState, false); - else if (local()) { - IgniteInternalFuture<?> rollbackFut = rollbackFuture(); - - boolean syncUpdate = txState == TxState.PREPARED || txState == TxState.COMMITTED || - rollbackFut == null || rollbackFut.isDone(); - - if (syncUpdate) - cctx.coordinators().updateState(mvccSnapshot, txState); - else { - // If tx was aborted, we need to wait tx log is updated on all backups. - rollbackFut.listen(new IgniteInClosure<IgniteInternalFuture>() { - @Override public void apply(IgniteInternalFuture fut) { - try { - cctx.coordinators().updateState(mvccSnapshot, txState); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to log TxState: " + txState, e); - } - } - }); - } - } - } + cctx.tm().mvccPrepare(this); } catch (IgniteCheckedException e) { - U.error(log, "Failed to log TxState: " + txState, e); + String msg = "Failed to update TxState: " + TxState.PREPARED; + + U.error(log, msg, e); - throw new IgniteException("Failed to log TxState: " + txState, e); + throw new IgniteException(msg, e); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/97b93b84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 438c8ab..27b1522 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVe import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxFinishSync; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest; @@ -69,6 +70,9 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCach import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearOptimisticTxPrepareFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; +import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; +import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion; +import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState; import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetection.TxDeadlockFuture; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; @@ -2397,6 +2401,29 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** + * Marks MVCC transaction as {@link TxState#COMMITTED} or {@link TxState#ABORTED}. + * + * @param tx Transaction. + * @param commit Commit flag. + * @throws IgniteCheckedException If failed to add version to TxLog. + */ + public void mvccFinish(IgniteTxAdapter tx, boolean commit) throws IgniteCheckedException { + if (!cctx.kernalContext().clientNode() && tx.mvccSnapshot != null && !(tx.near() && tx.remote())) + cctx.coordinators().updateState(tx.mvccSnapshot, commit ? TxState.COMMITTED : TxState.ABORTED, tx.local()); + } + + /** + * Marks MVCC transaction as {@link TxState#PREPARED}. + * + * @param tx Transaction. + * @throws IgniteCheckedException If failed to add version to TxLog. + */ + public void mvccPrepare(IgniteTxAdapter tx) throws IgniteCheckedException { + if (!cctx.kernalContext().clientNode() && tx.mvccSnapshot != null && !(tx.near() && tx.remote())) + cctx.coordinators().updateState(tx.mvccSnapshot, TxState.PREPARED); + } + + /** * Timeout object for node failure handler. */ private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter { http://git-wip-us.apache.org/repos/asf/ignite/blob/97b93b84/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java index 4abcaa1..053d370 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java @@ -1652,6 +1652,30 @@ public abstract class CacheMvccSqlTxQueriesAbstractTest extends CacheMvccAbstrac /** * @throws Exception If failed. */ + public void testFastInsertUpdateConcurrent() throws Exception { + ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) + .setIndexedTypes(Integer.class, Integer.class); + + Ignite ignite = startGridsMultiThreaded(4); + + IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME); + + for (int i = 0; i < 1000; i++) { + int key = i; + CompletableFuture.allOf( + CompletableFuture.runAsync(() -> { + cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, ?)").setArgs(key, key)); + }), + CompletableFuture.runAsync(() -> { + cache.query(new SqlFieldsQuery("update Integer set _val = ? where _key = ?").setArgs(key, key)); + }) + ).join(); + } + } + + /** + * @throws Exception If failed. + */ public void testIterator() throws Exception { ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT) .setIndexedTypes(Integer.class, Integer.class);