ignite-4371 Avoid synchronous 'rollback' call from system threads (cherry picked from commit 0c782b0)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c1fde05d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c1fde05d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c1fde05d Branch: refs/heads/master Commit: c1fde05d05251353f0e75125aca44ab145798c8e Parents: ad785cb Author: sboikov <[email protected]> Authored: Fri Dec 16 19:15:48 2016 +0300 Committer: sboikov <[email protected]> Committed: Mon Dec 19 11:43:34 2016 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 37 +++--- .../processors/cache/GridCacheProcessor.java | 2 +- .../processors/cache/GridCacheUtils.java | 10 +- .../GridDistributedTxRemoteAdapter.java | 2 +- .../distributed/dht/GridDhtTxFinishFuture.java | 74 +++++------ .../cache/distributed/dht/GridDhtTxLocal.java | 125 +++++++------------ .../distributed/dht/GridDhtTxLocalAdapter.java | 7 +- .../near/GridNearTxFinishFuture.java | 44 +++---- .../cache/distributed/near/GridNearTxLocal.java | 48 ++++--- .../cache/transactions/IgniteTxHandler.java | 13 +- .../transactions/IgniteTxLocalAdapter.java | 3 +- .../cache/transactions/IgniteTxManager.java | 7 +- .../GridCacheMissingCommitVersionSelfTest.java | 6 +- .../IgniteTxStoreExceptionAbstractSelfTest.java | 1 + 14 files changed, 161 insertions(+), 218 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c1fde05d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 503b334..d26031c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; @@ -854,24 +855,30 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana ready = cacheCtx.started(); if (ready) { - GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true); + GridAffinityAssignmentCache affCache = cacheCtx.affinity().affinityCache(); - if (useOldApi) { - locMap = new GridDhtPartitionFullMap(locMap.nodeId(), - locMap.nodeOrder(), - locMap.updateSequence(), - locMap); - } + if (affCache != null) { + GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true); + + if (useOldApi) { + locMap = new GridDhtPartitionFullMap(locMap.nodeId(), + locMap.nodeOrder(), + locMap.updateSequence(), + locMap); + } - addFullPartitionsMap(m, - dupData, - compress, - cacheCtx.cacheId(), - locMap, - cacheCtx.affinity().affinityCache().similarAffinityKey()); + addFullPartitionsMap(m, + dupData, + compress, + cacheCtx.cacheId(), + locMap, + affCache.similarAffinityKey()); - if (exchId != null) - m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true)); + if (exchId != null) + m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true)); + } + else + assert cctx.cacheContext(cacheCtx.cacheId()) == null : cacheCtx.name(); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c1fde05d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 0e0d769..cb96225 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1761,7 +1761,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * @param req Stop request. */ - public void prepareCacheStop(DynamicCacheChangeRequest req) { + private void prepareCacheStop(DynamicCacheChangeRequest req) { assert req.stop() || req.close() : req; GridCacheAdapter<?, ?> cache = caches.remove(maskNull(req.cacheName())); http://git-wip-us.apache.org/repos/asf/ignite/blob/c1fde05d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 3178203..fef6ddd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -928,9 +928,13 @@ public class GridCacheUtils { if (tx == null) return "null"; - return tx.getClass().getSimpleName() + "[id=" + tx.xid() + ", concurrency=" + tx.concurrency() + - ", isolation=" + tx.isolation() + ", state=" + tx.state() + ", invalidate=" + tx.isInvalidate() + - ", rollbackOnly=" + tx.isRollbackOnly() + ", nodeId=" + tx.nodeId() + + return tx.getClass().getSimpleName() + "[id=" + tx.xid() + + ", concurrency=" + tx.concurrency() + + ", isolation=" + tx.isolation() + + ", state=" + tx.state() + + ", invalidate=" + tx.isInvalidate() + + ", rollbackOnly=" + tx.isRollbackOnly() + + ", nodeId=" + tx.nodeId() + ", duration=" + (U.currentTimeMillis() - tx.startTime()) + ']'; } http://git-wip-us.apache.org/repos/asf/ignite/blob/c1fde05d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 4adfa8b..68c0e57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -386,7 +386,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter // If another thread is doing prepare or rollback. if (!state(PREPARING)) { // In optimistic mode prepare may be called multiple times. - if(state() != PREPARING || !optimistic()) { + if (state() != PREPARING || !optimistic()) { if (log.isDebugEnabled()) log.debug("Invalid transaction state for prepare: " + this); http://git-wip-us.apache.org/repos/asf/ignite/blob/c1fde05d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index ac2ab41..147cbea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -33,8 +33,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; -import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; -import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -94,9 +92,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur /** Near mappings. */ private Map<UUID, GridDistributedTxMapping> nearMap; - /** Trackable flag. */ - private boolean trackable = true; - /** * @param cctx Context. * @param tx Transaction. @@ -151,46 +146,24 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur /** {@inheritDoc} */ @Override public boolean trackable() { - return trackable; + return true; } /** {@inheritDoc} */ @Override public void markNotTrackable() { - trackable = false; + assert false; } /** * @param e Error. */ - public void onError(Throwable e) { - if (ERR_UPD.compareAndSet(this, null, e)) { - boolean marked = tx.setRollbackOnly(); - - if (e instanceof IgniteTxRollbackCheckedException) { - if (marked) { - try { - tx.rollback(); - } - catch (IgniteCheckedException ex) { - U.error(log, "Failed to automatically rollback transaction: " + tx, ex); - } - } - } - else if (tx.isSystemInvalidate()) { // Invalidate remote transactions on heuristic error. - finish(); + public void rollbackOnError(Throwable e) { + assert e != null; - try { - get(); - } - catch (IgniteTxHeuristicCheckedException ignore) { - // Future should complete with GridCacheTxHeuristicException. - } - catch (IgniteCheckedException err) { - U.error(log, "Failed to invalidate transaction: " + tx, err); - } - } + if (ERR_UPD.compareAndSet(this, null, e)) { + tx.setRollbackOnly(); - onComplete(); + finish(false); } } @@ -240,12 +213,21 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur /** {@inheritDoc} */ @Override public boolean onDone(IgniteInternalTx tx, Throwable err) { if (initialized() || err != null) { - if (this.tx.onePhaseCommit() && (this.tx.state() == COMMITTING)) - this.tx.tmFinish(err == null); - Throwable e = this.err; - if (e == null && commit) + if (this.tx.onePhaseCommit() && (this.tx.state() == COMMITTING)) { + try { + this.tx.tmFinish(err == null); + } + catch (IgniteCheckedException finishErr) { + U.error(log, "Failed to finish tx: " + tx, e); + + if (e == null) + e = finishErr; + } + } + + if (commit && e == null) e = this.tx.commitError(); Throwable finishErr = e != null ? e : err; @@ -255,7 +237,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur finishErr = this.tx.commitError(); if (this.tx.syncMode() != PRIMARY_SYNC) - this.tx.sendFinishReply(commit, finishErr); + this.tx.sendFinishReply(finishErr); // Don't forget to clean up. cctx.mvcc().removeFuture(futId); @@ -284,13 +266,15 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur /** * Initializes future. + * + * @param commit Commit flag. */ @SuppressWarnings({"SimplifiableIfStatement", "IfMayBeConditional"}) - public void finish() { + public void finish(boolean commit) { boolean sync; if (!F.isEmpty(dhtMap) || !F.isEmpty(nearMap)) - sync = finish(dhtMap, nearMap); + sync = finish(commit, dhtMap, nearMap); else if (!commit && !F.isEmpty(tx.lockTransactionNodes())) sync = rollbackLockTransactions(tx.lockTransactionNodes()); else @@ -308,7 +292,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur * @return {@code True} in case there is at least one synchronous {@code MiniFuture} to wait for. */ private boolean rollbackLockTransactions(Collection<ClusterNode> nodes) { - assert !commit; assert !F.isEmpty(nodes); if (tx.onePhaseCommit()) @@ -337,7 +320,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.commitVersion(), tx.threadId(), tx.isolation(), - commit, + false, tx.isInvalidate(), tx.system(), tx.ioPolicy(), @@ -390,11 +373,14 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur } /** + * @param commit Commit flag. * @param dhtMap DHT map. * @param nearMap Near map. * @return {@code True} in case there is at least one synchronous {@code MiniFuture} to wait for. */ - private boolean finish(Map<UUID, GridDistributedTxMapping> dhtMap, Map<UUID, GridDistributedTxMapping> nearMap) { + private boolean finish(boolean commit, + Map<UUID, GridDistributedTxMapping> dhtMap, + Map<UUID, GridDistributedTxMapping> nearMap) { if (tx.onePhaseCommit()) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/c1fde05d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index b659abb..4e39e9b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -503,52 +503,57 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } /** + * @param commit Commit flag. * @param prepFut Prepare future. * @param fut Finish future. */ - private void finishCommit(@Nullable IgniteInternalFuture prepFut, GridDhtTxFinishFuture fut) { + private void finishTx(boolean commit, @Nullable IgniteInternalFuture prepFut, GridDhtTxFinishFuture fut) { + assert prepFut == null || prepFut.isDone(); + boolean primarySync = syncMode() == PRIMARY_SYNC; IgniteCheckedException err = null; - try { - if (prepFut != null) - prepFut.get(); // Check for errors. - - if (finish(true)) { - if (primarySync) - sendFinishReply(true, null); - - fut.finish(); + if (!commit && prepFut != null) { + try { + prepFut.get(); } - else { - err = new IgniteCheckedException("Failed to commit transaction: " + CU.txString(this)); - - fut.onError(err); + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to prepare transaction [tx=" + this + ", e=" + e + ']'); + } + finally { + prepFut = null; } } - catch (IgniteTxOptimisticCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to optimistically prepare transaction [tx=" + this + ", e=" + e + ']'); - err = e; + try { + if (prepFut != null) + prepFut.get(); // Check for errors. - fut.onError(e); + boolean finished = finish(commit); + + if (!finished) + err = new IgniteCheckedException("Failed to finish transaction [commit=" + commit + + ", tx=" + CU.txString(this) + ']'); } catch (IgniteCheckedException e) { - U.error(log, "Failed to prepare transaction: " + this, e); + U.error(log, "Failed to finish transaction [commit=" + commit + ", tx=" + this + ']', e); err = e; - - fut.onError(e); } - if (primarySync && err != null) - sendFinishReply(true, err); + if (primarySync) + sendFinishReply(err); + + if (err != null) + fut.rollbackOnError(err); + else + fut.finish(commit); } /** {@inheritDoc} */ - @SuppressWarnings({"ThrowableInstanceNeverThrown"}) + @SuppressWarnings("unchecked") @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() { if (log.isDebugEnabled()) log.debug("Committing dht local tx: " + this); @@ -557,7 +562,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa if (pessimistic()) prepareAsync(); - final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*commit*/true); + final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, true); cctx.mvcc().addFuture(fut, fut.futureId()); @@ -565,11 +570,11 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa if (prep != null) { if (prep.isDone()) - finishCommit(prep, fut); + finishTx(true, prep, fut); else { prep.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> f) { - finishCommit(f, fut); + finishTx(true, f, fut); } }); } @@ -577,7 +582,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa else { assert optimistic(); - finishCommit(null, fut); + finishTx(true, null, fut); } return fut; @@ -590,70 +595,26 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa PREP_FUT_UPD.compareAndSet(this, fut, null); } - /** - * @param prepFut Prepare future. - * @param fut Finish future. - */ - private void finishRollback(@Nullable IgniteInternalFuture prepFut, GridDhtTxFinishFuture fut) { - try { - if (prepFut != null) - prepFut.get(); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to prepare or rollback transaction [tx=" + this + ", e=" + e + ']'); - } - - boolean primarySync = syncMode() == PRIMARY_SYNC; - - IgniteCheckedException err = null; - - try { - if (finish(false) || state() == UNKNOWN) { - if (primarySync) - sendFinishReply(false, null); - - fut.finish(); - } - else { - err = new IgniteCheckedException("Failed to rollback transaction: " + - CU.txString(GridDhtTxLocal.this)); - - fut.onError(err); - } - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to gracefully rollback transaction: " + CU.txString(GridDhtTxLocal.this), - e); - - err = e; - - fut.onError(e); - } - - if (primarySync && err != null) - sendFinishReply(false, err); - } - /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() { - GridDhtTxPrepareFuture prepFut = this.prepFut; - - final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*rollback*/false); + final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, false); cctx.mvcc().addFuture(fut, fut.futureId()); + GridDhtTxPrepareFuture prepFut = this.prepFut; + if (prepFut != null) { prepFut.complete(); prepFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> f) { - finishRollback(f, fut); + finishTx(false, f, fut); } }); } else - finishRollback(null, fut); + finishTx(false, null, fut); return fut; } @@ -672,7 +633,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } /** {@inheritDoc} */ - @Override protected void sendFinishReply(boolean commit, @Nullable Throwable err) { + @Override protected void sendFinishReply(@Nullable Throwable err) { if (nearFinFutId != null) { if (nearNodeId.equals(cctx.localNodeId())) { if (log.isDebugEnabled()) @@ -701,8 +662,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } } catch (Throwable ex) { - U.error(log, "Failed to send finish response to node (transaction was " + - (commit ? "committed" : "rolledback") + ") [txId=" + nearXidVersion() + + U.error(log, "Failed to send finish response to node [txId=" + nearXidVersion() + + ", txState=" + state() + ", dhtTxId=" + xidVersion() + ", node=" + nearNodeId + ", res=" + res + ']', ex); http://git-wip-us.apache.org/repos/asf/ignite/blob/c1fde05d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 35dfb62..1d88d84 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -236,10 +236,9 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { AffinityTopologyVersion topVer); /** - * @param commit Commit flag. * @param err Error, if any. */ - protected abstract void sendFinishReply(boolean commit, @Nullable Throwable err); + protected abstract void sendFinishReply(@Nullable Throwable err); /** {@inheritDoc} */ @Override public boolean needsCompletedVersions() { @@ -249,7 +248,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { /** * @return Versions for all pending locks that were in queue before tx locks were released. */ - public Collection<GridCacheVersion> pendingVersions() { + Collection<GridCacheVersion> pendingVersions() { return pendingVers == null ? Collections.<GridCacheVersion>emptyList() : pendingVers; } @@ -726,7 +725,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { /*read*/read, accessTtl, filter == null ? CU.empty0() : filter, - /**computeInvoke*/false); + /*computeInvoke*/false); return ret; } http://git-wip-us.apache.org/repos/asf/ignite/blob/c1fde05d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 54bd543..9acab56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -298,34 +298,16 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu if (isDone()) return false; - if (err != null) { - tx.commitError(err); - - boolean marked = tx.setRollbackOnly(); - - if (err instanceof IgniteTxRollbackCheckedException) { - if (marked) { - try { - tx.rollback(); - } - catch (IgniteCheckedException ex) { - U.error(log, "Failed to automatically rollback transaction: " + tx, ex); - } - } - } - else if (tx.implicit() && tx.isSystemInvalidate()) { // Finish implicit transaction on heuristic error. - try { - tx.close(); - } - catch (IgniteCheckedException ex) { - U.error(log, "Failed to invalidate transaction: " + tx, ex); - } - } + if (err != null) + tx.setRollbackOnly(); + + if (commit) { + if (tx.commitError() != null) + err = tx.commitError(); + else if (err != null) + tx.commitError(err); } - if (commit && tx.commitError() != null) - err = tx.commitError(); - if (initialized() || err != null) { if (tx.needCheckBackup()) { assert tx.onePhaseCommit(); @@ -349,7 +331,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu finishOnePhase(commit); - tx.tmFinish(commit); + try { + tx.tmFinish(commit); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to finish tx: " + tx, e); + + if (err == null) + err = e; + } } if (super.onDone(tx0, err)) { http://git-wip-us.apache.org/repos/asf/ignite/blob/c1fde05d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index ed37059..0730300 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -231,7 +231,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } /** {@inheritDoc} */ - @Override protected void sendFinishReply(boolean commit, @Nullable Throwable err) { + @Override protected void sendFinishReply(@Nullable Throwable err) { // We are in near transaction, do not send finish reply to local node. } @@ -1062,50 +1062,48 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { return new GridFinishedFuture<IgniteInternalTx>(this); } - final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*commit*/true); + final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, true); cctx.mvcc().addFuture(fut, fut.futureId()); if (prep == null || prep.isDone()) { assert prep != null || optimistic(); + IgniteCheckedException err = null; + try { if (prep != null) prep.get(); // Check for errors of a parent future. - - fut.finish(); - } - catch (IgniteTxOptimisticCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed optimistically to prepare transaction [tx=" + this + ", e=" + e + ']'); - - fut.onError(e); } catch (IgniteCheckedException e) { - U.error(log, "Failed to prepare transaction: " + this, e); + err = e; - fut.onError(e); + U.error(log, "Failed to prepare transaction: " + this, e); } + + if (err != null) + fut.rollbackOnError(err); + else + fut.finish(true); } else prep.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> f) { + IgniteCheckedException err = null; + try { f.get(); // Check for errors of a parent future. - - fut.finish(); - } - catch (IgniteTxOptimisticCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed optimistically to prepare transaction [tx=" + this + ", e=" + e + ']'); - - fut.onError(e); } catch (IgniteCheckedException e) { - U.error(log, "Failed to prepare transaction: " + this, e); + err = e; - fut.onError(e); + U.error(log, "Failed to prepare transaction: " + this, e); } + + if (err != null) + fut.rollbackOnError(err); + else + fut.finish(true); } }); @@ -1121,7 +1119,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { if (log.isDebugEnabled()) log.debug("Rolling back colocated tx locally: " + this); - final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*commit*/false); + final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, false); cctx.mvcc().addFuture(fut, fut.futureId()); @@ -1138,7 +1136,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { e.getMessage() + ']'); } - fut.finish(); + fut.finish(false); } else prep.listen(new CI1<IgniteInternalFuture<?>>() { @@ -1151,7 +1149,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { e.getMessage() + ']'); } - fut.finish(); + fut.finish(false); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/c1fde05d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 4b99079..b6e3c48 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -472,13 +472,8 @@ public class IgniteTxHandler { req.last()); if (tx.isRollbackOnly() && !tx.commitOnPrepare()) { - try { - if (tx.state() != TransactionState.ROLLED_BACK && tx.state() != TransactionState.ROLLING_BACK) - tx.rollback(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to rollback transaction: " + tx, e); - } + if (tx.state() != TransactionState.ROLLED_BACK && tx.state() != TransactionState.ROLLING_BACK) + tx.rollbackAsync(); } final GridDhtTxLocal tx0 = tx; @@ -872,7 +867,7 @@ public class IgniteTxHandler { U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e); - IgniteInternalFuture<IgniteInternalTx> res = null; + IgniteInternalFuture<IgniteInternalTx> res; IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync(); @@ -884,7 +879,7 @@ public class IgniteTxHandler { if (e instanceof Error) throw (Error)e; - return res == null ? new GridFinishedFuture<IgniteInternalTx>(e) : res; + return res; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c1fde05d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 393fb1a..8d0a2b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1000,8 +1000,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig * Commits transaction to transaction manager. Used for one-phase commit transactions only. * * @param commit If {@code true} commits transaction, otherwise rollbacks. + * @throws IgniteCheckedException If failed. */ - public void tmFinish(boolean commit) { + public void tmFinish(boolean commit) throws IgniteCheckedException { assert onePhaseCommit(); if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) { http://git-wip-us.apache.org/repos/asf/ignite/blob/c1fde05d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index e2e9868..036fb0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1191,8 +1191,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * Commits a transaction. * * @param tx Transaction to commit. + * @throws IgniteCheckedException If failed. */ - public void commitTx(IgniteInternalTx tx) { + public void commitTx(IgniteInternalTx tx) throws IgniteCheckedException { assert tx != null; assert tx.state() == COMMITTING : "Invalid transaction state for commit from tm [state=" + tx.state() + ", expected=COMMITTING, tx=" + tx + ']'; @@ -1210,12 +1211,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE); // 1. Make sure that committed version has been recorded. - if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) { + if (!(committed || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) { uncommitTx(tx); tx.errorWhenCommitting(); - throw new IgniteException("Missing commit version (consider increasing " + + throw new IgniteCheckedException("Missing commit version (consider increasing " + IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() + ", tx=" + tx.getClass().getSimpleName() + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/c1fde05d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java index 19e49f3..ac56d18 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java @@ -43,7 +43,7 @@ public class GridCacheMissingCommitVersionSelfTest extends GridCommonAbstractTes private volatile boolean putFailed; /** */ - private String maxCompletedTxCount; + private String maxCompletedTxCnt; /** */ @@ -53,7 +53,7 @@ public class GridCacheMissingCommitVersionSelfTest extends GridCommonAbstractTes /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration() throws Exception { - maxCompletedTxCount = System.getProperty(IGNITE_MAX_COMPLETED_TX_COUNT); + maxCompletedTxCnt = System.getProperty(IGNITE_MAX_COMPLETED_TX_COUNT); System.setProperty(IGNITE_MAX_COMPLETED_TX_COUNT, String.valueOf(5)); @@ -78,7 +78,7 @@ public class GridCacheMissingCommitVersionSelfTest extends GridCommonAbstractTes /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { - System.setProperty(IGNITE_MAX_COMPLETED_TX_COUNT, maxCompletedTxCount != null ? maxCompletedTxCount : ""); + System.setProperty(IGNITE_MAX_COMPLETED_TX_COUNT, maxCompletedTxCnt != null ? maxCompletedTxCnt : ""); super.afterTest(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/c1fde05d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java index b65b441..795ab81 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java @@ -357,6 +357,7 @@ public abstract class IgniteTxStoreExceptionAbstractSelfTest extends GridCacheAb /** * @param key Key. + * @param putBefore If {@code true} expects non-null values. * @throws Exception If failed. */ private void checkValue(final Integer key, boolean putBefore) throws Exception {
