Repository: ignite Updated Branches: refs/heads/ignite-1537 7fd645373 -> 215ff1eb6
ignite-1.5 Fixed hang on metadata update inside put in atomic cache when topology read lock is held. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/215ff1eb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/215ff1eb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/215ff1eb Branch: refs/heads/ignite-1537 Commit: 215ff1eb63ba71f553956c24b51b3fbb067c038e Parents: 7fd6453 Author: sboikov <[email protected]> Authored: Tue Dec 22 11:32:04 2015 +0300 Committer: sboikov <[email protected]> Committed: Tue Dec 22 11:32:04 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 22 ++--- .../processors/cache/GridCacheProxyImpl.java | 7 +- .../cache/GridCacheSharedContext.java | 9 +- .../processors/cache/IgniteCacheProxy.java | 11 ++- .../processors/cache/IgniteInternalCache.java | 8 +- .../binary/CacheObjectBinaryProcessorImpl.java | 19 ++-- .../GridDistributedTxRemoteAdapter.java | 2 +- .../distributed/dht/GridDhtLockFuture.java | 2 +- .../dht/GridDhtTransactionalCacheAdapter.java | 2 +- .../cache/distributed/dht/GridDhtTxLocal.java | 2 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 2 +- .../dht/atomic/GridDhtAtomicCache.java | 91 +++++++++++--------- .../colocated/GridDhtColocatedLockFuture.java | 10 +-- .../distributed/near/GridNearLockFuture.java | 10 +-- ...arOptimisticSerializableTxPrepareFuture.java | 4 +- .../near/GridNearOptimisticTxPrepareFuture.java | 4 +- ...ridNearOptimisticTxPrepareFutureAdapter.java | 30 ++----- .../GridNearPessimisticTxPrepareFuture.java | 4 +- .../cache/distributed/near/GridNearTxLocal.java | 14 +-- .../near/GridNearTxPrepareFutureAdapter.java | 4 +- .../cache/transactions/IgniteInternalTx.java | 3 +- .../cache/transactions/IgniteTxAdapter.java | 2 +- .../cache/transactions/IgniteTxHandler.java | 2 +- .../transactions/IgniteTxLocalAdapter.java | 20 ++--- .../cache/transactions/IgniteTxLocalEx.java | 1 - .../cache/transactions/IgniteTxManager.java | 42 +++++---- ...niteBinaryMetadataUpdateNodeRestartTest.java | 2 +- 27 files changed, 149 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 1d097b7..5d4c386 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -2077,21 +2077,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Nullable @Override public <T> EntryProcessorResult<T> tryInvoke(K key, + @Nullable @Override public <T> EntryProcessorResult<T> invoke(@Nullable AffinityTopologyVersion topVer, + K key, EntryProcessor<K, V, T> entryProcessor, Object... args) throws IgniteCheckedException { - return invoke0(false, key, entryProcessor, args); + return invoke0(topVer, key, entryProcessor, args); } /** {@inheritDoc} */ @Override public <T> EntryProcessorResult<T> invoke(final K key, final EntryProcessor<K, V, T> entryProcessor, final Object... args) throws IgniteCheckedException { - return invoke0(true, key, entryProcessor, args); + return invoke0(null, key, entryProcessor, args); } /** - * @param waitTopFut If {@code false} does not wait for affinity change future. + * @param topVer Locked topology version. * @param key Key. * @param entryProcessor Entry processor. * @param args Entry processor arguments. @@ -2099,7 +2100,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @throws IgniteCheckedException If failed. */ private <T> EntryProcessorResult<T> invoke0( - final boolean waitTopFut, + @Nullable final AffinityTopologyVersion topVer, final K key, final EntryProcessor<K, V, T> entryProcessor, final Object... args) @@ -2112,13 +2113,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return syncOp(new SyncOp<EntryProcessorResult<T>>(true) { @Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - assert !waitTopFut || tx.implicit(); + assert topVer == null || tx.implicit(); - if (!waitTopFut) - tx.topologyVersion(ctx.shared().exchange().readyAffinityVersion()); + if (topVer != null) + tx.topologyVersion(topVer); IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx, - waitTopFut, key, (EntryProcessor<K, V, Object>)entryProcessor, args); @@ -3992,7 +3992,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<IgniteInternalTx> f = new GridEmbeddedFuture<>(fut, new C2<Object, Exception, IgniteInternalFuture<IgniteInternalTx>>() { @Override public IgniteInternalFuture<IgniteInternalTx> apply(Object o, Exception e) { - return tx.commitAsync(true); + return tx.commitAsync(); } }); @@ -4001,7 +4001,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return f; } - IgniteInternalFuture<IgniteInternalTx> f = tx.commitAsync(true); + IgniteInternalFuture<IgniteInternalTx> f = tx.commitAsync(); saveFuture(holder, f); http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index cbdb5b4..8ffd273 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -36,6 +36,7 @@ import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityProxy; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; @@ -1243,13 +1244,15 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ - @Nullable @Override public <T> EntryProcessorResult<T> tryInvoke(K key, + @Nullable @Override public <T> EntryProcessorResult<T> invoke( + AffinityTopologyVersion topVer, + K key, EntryProcessor<K, V, T> entryProcessor, Object... args) throws IgniteCheckedException { CacheOperationContext prev = gate.enter(opCtx); try { - return delegate.tryInvoke(key, entryProcessor, args); + return delegate.invoke(topVer, key, entryProcessor, args); } finally { gate.leave(prev); http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 0a03494..f52e378 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -575,12 +575,7 @@ public class GridCacheSharedContext<K, V> { @Nullable public AffinityTopologyVersion lockedTopologyVersion(IgniteInternalTx ignore) { long threadId = Thread.currentThread().getId(); - IgniteInternalTx tx = txMgr.anyActiveThreadTx(threadId, ignore); - - AffinityTopologyVersion topVer = null; - - if (tx != null && tx.topologyVersionSnapshot() != null) - topVer = tx.topologyVersionSnapshot(); + AffinityTopologyVersion topVer = txMgr.anyActiveThreadTx(threadId, ignore); if (topVer == null) topVer = mvccMgr.lastExplicitLockTopologyVersion(threadId); @@ -618,7 +613,7 @@ public class GridCacheSharedContext<K, V> { if (ctx == null) { tx.txState().awaitLastFut(this); - return tx.commitAsync(true); + return tx.commitAsync(); } else return ctx.cache().commitTxAsync(tx); http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 12b8e92..79d04ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -64,6 +64,7 @@ import org.apache.ignite.internal.AsyncSupportAdapter; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.query.CacheQuery; import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; import org.apache.ignite.internal.processors.query.GridQueryProcessor; @@ -1485,14 +1486,16 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** - * Tries to execute invoke operation. Fails if topology exchange is in progress. - * + * @param topVer Locked topology version. * @param key Key. * @param entryProcessor Entry processor. * @param args Arguments. * @return Invoke result. */ - public <T> T tryInvoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) { + public <T> T invoke(@Nullable AffinityTopologyVersion topVer, + K key, + EntryProcessor<K, V, T> entryProcessor, + Object... args) { try { GridCacheGateway<K, V> gate = this.gate; @@ -1502,7 +1505,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (isAsync()) throw new UnsupportedOperationException(); else { - EntryProcessorResult<T> res = delegate.tryInvoke(key, entryProcessor, args); + EntryProcessorResult<T> res = delegate.invoke(topVer, key, entryProcessor, args); return res != null ? res.get() : null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index fcba9c4..433290c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -40,6 +40,7 @@ import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -1876,15 +1877,16 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { @Nullable public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException; /** - * Tries to execute invoke operation. Will fail if topology exchange is in progress. - * + * @param topVer Locked topology version. * @param key Key. * @param entryProcessor Entry processor. * @param args Arguments. * @return Invoke result. * @throws IgniteCheckedException If failed. */ - @Nullable public <T> EntryProcessorResult<T> tryInvoke(K key, + @Nullable public <T> EntryProcessorResult<T> invoke( + @Nullable AffinityTopologyVersion topVer, + K key, EntryProcessor<K, V, T> entryProcessor, Object... args) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 36558e7..91d60bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -286,7 +286,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm } for (Map.Entry<Integer, BinaryMetadata> e : metaBuf.entrySet()) - addMeta(e.getKey(), e.getValue().wrap(binaryCtx), false); + addMeta(e.getKey(), e.getValue().wrap(binaryCtx)); metaBuf.clear(); @@ -474,16 +474,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm } /** {@inheritDoc} */ - @Override public void addMeta(final int typeId, final BinaryType newMeta) throws BinaryObjectException { - addMeta(typeId, newMeta, true); - } - - /** - * @param typeId Type ID. - * @param newMeta New meta data. - * @param tryInvoke If {@code true} uses {@link IgniteCacheProxy#tryInvoke} for metadata update. - */ - public void addMeta(final int typeId, final BinaryType newMeta, boolean tryInvoke) { + public void addMeta(final int typeId, final BinaryType newMeta) { assert newMeta != null; assert newMeta instanceof BinaryTypeImpl; @@ -495,9 +486,9 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm BinaryMetadata oldMeta = metaDataCache.localPeek(key); BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0); - BinaryObjectException err = tryInvoke ? - metaDataCache.tryInvoke(key, new MetadataProcessor(mergedMeta)) : - metaDataCache.invoke(key, new MetadataProcessor(mergedMeta)); + AffinityTopologyVersion topVer = ctx.cache().context().lockedTopologyVersion(null); + + BinaryObjectException err = metaDataCache.invoke(topVer, key, new MetadataProcessor(mergedMeta)); if (err != null) throw err; http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 56dc684..1fd0b2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -728,7 +728,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync(boolean waitTopFut) { + @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() { try { commit(); http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index f0d2e15..98711b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -743,7 +743,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> if (tx != null) { cctx.tm().txContext(tx); - set = cctx.tm().setTxTopologyHint(tx); + set = cctx.tm().setTxTopologyHint(tx.topologyVersionSnapshot()); } try { http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index b7bca06..ae24ed1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -950,7 +950,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (resp.error() == null && t.onePhaseCommit()) { assert t.implicit(); - return t.commitAsync(true).chain( + return t.commitAsync().chain( new C1<IgniteInternalFuture<IgniteInternalTx>, GridNearLockResponse>() { @Override public GridNearLockResponse apply(IgniteInternalFuture<IgniteInternalTx> f) { try { http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 621281c..f344d48 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -480,7 +480,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa /** {@inheritDoc} */ @SuppressWarnings({"ThrowableInstanceNeverThrown"}) - @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync(boolean waitTopFut) { + @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() { if (log.isDebugEnabled()) log.debug("Committing dht local tx: " + this); http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 334cee7..40399b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -643,7 +643,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (prepErr == null) { try { - fut = tx.commitAsync(true); + fut = tx.commitAsync(); } catch (RuntimeException | Error e) { Exception hEx = new IgniteTxHeuristicCheckedException("Commit produced a runtime " + http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 8cb5249..3c8b7d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -77,6 +77,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; @@ -1240,7 +1241,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { top.readLock(); try { - if (topology().stopping()) { + if (top.stopping()) { res.addFailedKeys(keys, new IgniteCheckedException("Failed to perform cache operation " + "(cache is stopped): " + name())); @@ -1289,48 +1290,58 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheReturn retVal = null; - if (keys.size() > 1 && // Several keys ... - writeThrough() && !req.skipStore() && // and store is enabled ... - !ctx.store().isLocal() && // and this is not local store ... - !ctx.dr().receiveEnabled() // and no DR. - ) { - // This method can only be used when there are no replicated entries in the batch. - UpdateBatchResult updRes = updateWithBatch(node, - hasNear, - req, - res, - locked, - ver, - dhtFut, - completionCb, - ctx.isDrEnabled(), - taskName, - expiry, - sndPrevVal); + IgniteTxManager tm = ctx.tm(); - deleted = updRes.deleted(); - dhtFut = updRes.dhtFuture(); + boolean set = tm.setTxTopologyHint(req.topologyVersion()); - if (req.operation() == TRANSFORM) - retVal = updRes.invokeResults(); + try { + if (keys.size() > 1 && // Several keys ... + writeThrough() && !req.skipStore() && // and store is enabled ... + !ctx.store().isLocal() && // and this is not local store ... + !ctx.dr().receiveEnabled() // and no DR. + ) { + // This method can only be used when there are no replicated entries in the batch. + UpdateBatchResult updRes = updateWithBatch(node, + hasNear, + req, + res, + locked, + ver, + dhtFut, + completionCb, + ctx.isDrEnabled(), + taskName, + expiry, + sndPrevVal); + + deleted = updRes.deleted(); + dhtFut = updRes.dhtFuture(); + + if (req.operation() == TRANSFORM) + retVal = updRes.invokeResults(); + } + else { + UpdateSingleResult updRes = updateSingle(node, + hasNear, + req, + res, + locked, + ver, + dhtFut, + completionCb, + ctx.isDrEnabled(), + taskName, + expiry, + sndPrevVal); + + retVal = updRes.returnValue(); + deleted = updRes.deleted(); + dhtFut = updRes.dhtFuture(); + } } - else { - UpdateSingleResult updRes = updateSingle(node, - hasNear, - req, - res, - locked, - ver, - dhtFut, - completionCb, - ctx.isDrEnabled(), - taskName, - expiry, - sndPrevVal); - - retVal = updRes.returnValue(); - deleted = updRes.deleted(); - dhtFut = updRes.dhtFuture(); + finally { + if (set) + tm.setTxTopologyHint(null); } if (retVal == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 22b329c..7fba9bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -596,12 +596,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId); // If there is another system transaction in progress, use it's topology version to prevent deadlock. - if (topVer == null && tx != null && tx.system()) { - IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(Thread.currentThread().getId(), tx); - - if (tx0 != null) - topVer = tx0.topologyVersionSnapshot(); - } + if (topVer == null && tx != null && tx.system()) + topVer = cctx.tm().anyActiveThreadTx(Thread.currentThread().getId(), tx); if (topVer != null && tx != null) tx.topologyVersion(topVer); @@ -980,7 +976,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture * @throws IgniteCheckedException If failed. */ private void proceedMapping() throws IgniteCheckedException { - boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx); + boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx.topologyVersionSnapshot()); try { proceedMapping0(); http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 23e0f6b..413f5d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -723,12 +723,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId); // If there is another system transaction in progress, use it's topology version to prevent deadlock. - if (topVer == null && tx != null && tx.system()) { - IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(threadId, tx); - - if (tx0 != null) - topVer = tx0.topologyVersionSnapshot(); - } + if (topVer == null && tx != null && tx.system()) + topVer = cctx.tm().anyActiveThreadTx(threadId, tx); if (topVer != null && tx != null) tx.topologyVersion(topVer); @@ -1098,7 +1094,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean * @throws IgniteCheckedException If failed. */ private void proceedMapping() throws IgniteCheckedException { - boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx); + boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx.topologyVersionSnapshot()); try { proceedMapping0(); http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index afc2d6d..37dc564 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -305,7 +305,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim return; } - boolean set = cctx.tm().setTxTopologyHint(tx); + boolean set = cctx.tm().setTxTopologyHint(tx.topologyVersionSnapshot()); try { prepare(tx.readEntries(), tx.writeEntries(), remap, topLocked); @@ -857,7 +857,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim * @param res Response. */ private void remap(final GridNearTxPrepareResponse res) { - prepareOnTopology(true, true, new Runnable() { + prepareOnTopology(true, new Runnable() { @Override public void run() { onDone(res); } http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 773259e..a9f158a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -406,7 +406,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa if (isDone()) return; - boolean set = cctx.tm().setTxTopologyHint(tx); + boolean set = cctx.tm().setTxTopologyHint(tx.topologyVersionSnapshot()); try { assert !m.empty(); @@ -749,7 +749,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa * */ private void remap() { - prepareOnTopology(true, true, new Runnable() { + prepareOnTopology(true, new Runnable() { @Override public void run() { onDone((GridNearTxPrepareResponse)null); } http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java index b6d4342..f29eda2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java @@ -20,11 +20,9 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.util.Collection; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -50,19 +48,15 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT } /** {@inheritDoc} */ - @Override public final void prepare(boolean waitTopFut) { + @Override public final void prepare() { // Obtain the topology version to use. long threadId = Thread.currentThread().getId(); AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId); // If there is another system transaction in progress, use it's topology version to prevent deadlock. - if (topVer == null && tx != null && tx.system()) { - IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(threadId, tx); - - if (tx0 != null) - topVer = tx0.topologyVersionSnapshot(); - } + if (topVer == null && tx != null && tx.system()) + topVer = cctx.tm().anyActiveThreadTx(threadId, tx); if (topVer != null) { tx.topologyVersion(topVer); @@ -74,7 +68,7 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT return; } - prepareOnTopology(waitTopFut, false, null); + prepareOnTopology(false, null); } /** @@ -94,11 +88,10 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT } /** - * @param waitTopFut If {@code false} does not wait for affinity change future. * @param remap Remap flag. * @param c Optional closure to run after map. */ - protected final void prepareOnTopology(boolean waitTopFut, final boolean remap, @Nullable final Runnable c) { + protected final void prepareOnTopology(final boolean remap, @Nullable final Runnable c) { GridDhtTopologyFuture topFut = topologyReadLock(); AffinityTopologyVersion topVer = null; @@ -141,17 +134,6 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT c.run(); } else { - if (!waitTopFut) { - ClusterTopologyCheckedException err = new ClusterTopologyCheckedException("Failed to execute update, " + - "cluster topology changed."); - - err.retryReadyFuture(topFut); - - onDone(err); - - return; - } - topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { @@ -159,7 +141,7 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT try { fut.get(); - prepareOnTopology(true, remap, c); + prepareOnTopology(remap, c); } catch (IgniteCheckedException e) { onDone(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 691a2a8..ffe5373 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -135,9 +135,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA } /** {@inheritDoc} */ - @Override public void prepare(boolean waitTopFut) { - assert waitTopFut; - + @Override public void prepare() { if (!tx.state(PREPARING)) { if (tx.setRollbackOnly()) { if (tx.timedOut()) http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 3ee2981..ae4972e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -784,14 +784,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> prepareAsync() { - return prepareAsync0(true); - } - - /** - * @param waitTopFut If {@code false} does not wait for affinity change future. - * @return Prepare future. - */ - private IgniteInternalFuture<?> prepareAsync0(boolean waitTopFut) { GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)prepFut.get(); if (fut == null) { @@ -813,18 +805,18 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { mapExplicitLocks(); - fut.prepare(waitTopFut); + fut.prepare(); return fut; } /** {@inheritDoc} */ @SuppressWarnings({"ThrowableInstanceNeverThrown"}) - @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync(boolean waitTopFut) { + @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() { if (log.isDebugEnabled()) log.debug("Committing near local tx: " + this); - prepareAsync0(waitTopFut); + prepareAsync(); GridNearTxFinishFuture fut = commitFut.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java index a587687..52cad91 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java @@ -130,10 +130,8 @@ public abstract class GridNearTxPrepareFutureAdapter extends /** * Prepares transaction. - * - * @param waitTopFut If {@code false} does not wait for affinity change future. */ - public abstract void prepare(boolean waitTopFut); + public abstract void prepare(); /** * @param nodeId Sender. http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index 7d75b2c..f5f99f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -619,10 +619,9 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { /** * Asynchronously commits this transaction by initiating {@code two-phase-commit} process. * - * @param waitTopFut If {@code false} does not wait for affinity change future. * @return Future for commit operation. */ - public IgniteInternalFuture<IgniteInternalTx> commitAsync(boolean waitTopFut); + public IgniteInternalFuture<IgniteInternalTx> commitAsync(); /** * Callback invoked whenever there is a lock that has been acquired http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index f2ada64..53f4f56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -2063,7 +2063,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync(boolean waitTopFut) { + @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 81c8a3c..b25baf8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -691,7 +691,7 @@ public class IgniteTxHandler { tx.nearFinishFutureId(req.futureId()); tx.nearFinishMiniId(req.miniId()); - IgniteInternalFuture<IgniteInternalTx> commitFut = tx.commitAsync(true); + IgniteInternalFuture<IgniteInternalTx> commitFut = tx.commitAsync(); // Only for error logging. commitFut.listen(CU.errorLogger(log)); http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 08e564e..70c79a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -580,7 +580,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /** {@inheritDoc} */ @Override public void commit() throws IgniteCheckedException { try { - commitAsync(true).get(); + commitAsync().get(); } finally { cctx.tm().resetContext(); @@ -1953,16 +1953,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter V val, boolean retval, CacheEntryPredicate[] filter) { - return putAsync0(cacheCtx, true, key, val, null, null, retval, filter); + return putAsync0(cacheCtx, key, val, null, null, retval, filter); } /** {@inheritDoc} */ @Override public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(GridCacheContext cacheCtx, - boolean waitTopFut, K key, EntryProcessor<K, V, Object> entryProcessor, Object... invokeArgs) { - return (IgniteInternalFuture)putAsync0(cacheCtx, waitTopFut, key, null, entryProcessor, invokeArgs, true, null); + return (IgniteInternalFuture)putAsync0(cacheCtx, key, null, entryProcessor, invokeArgs, true, null); } /** {@inheritDoc} */ @@ -2915,7 +2914,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter */ private <K, V> IgniteInternalFuture putAsync0( final GridCacheContext cacheCtx, - boolean waitTopFut, K key, @Nullable V val, @Nullable EntryProcessor<K, V, Object> entryProcessor, @@ -3016,7 +3014,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } else - return optimisticPutFuture(cacheCtx, waitTopFut, loadFut, ret, keepBinary); + return optimisticPutFuture(cacheCtx, loadFut, ret, keepBinary); } catch (IgniteCheckedException e) { return new GridFinishedFuture(e); @@ -3195,7 +3193,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } else - return optimisticPutFuture(cacheCtx, true, loadFut, ret, keepBinary); + return optimisticPutFuture(cacheCtx, loadFut, ret, keepBinary); } catch (RuntimeException e) { onException(); @@ -3206,7 +3204,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /** * @param cacheCtx Cache context. - * @param waitTopFut If {@code false} does not wait for affinity change future. * @param loadFut Missing keys load future. * @param ret Future result. * @param keepBinary Keep binary flag. @@ -3214,7 +3211,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter */ private IgniteInternalFuture optimisticPutFuture( final GridCacheContext cacheCtx, - boolean waitTopFut, IgniteInternalFuture<Void> loadFut, final GridCacheReturn ret, final boolean keepBinary @@ -3231,7 +3227,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter return new GridFinishedFuture<>(e); } - return nonInterruptable(commitAsync(waitTopFut).chain( + return nonInterruptable(commitAsync().chain( new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() { @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut) throws IgniteCheckedException { @@ -3472,7 +3468,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter // with prepare response, if required. assert loadFut.isDone(); - return nonInterruptable(commitAsync(true).chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() { + return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() { @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut) throws IgniteCheckedException { try { @@ -3967,7 +3963,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (commit && commitAfterLock()) { rollback = false; - return commitAsync(true).chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, T>() { + return commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, T>() { @Override public T applyx(IgniteInternalFuture<IgniteInternalTx> f) throws IgniteCheckedException { f.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index e1b73cc..a5d3373 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -115,7 +115,6 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { */ public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync( GridCacheContext cacheCtx, - boolean waitTopFut, K key, EntryProcessor<K, V, Object> entryProcessor, Object... invokeArgs); http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 205be49..0471443 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -115,7 +115,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { private final ThreadLocal<IgniteInternalTx> threadCtx = new ThreadLocal<>(); /** Transaction which topology version should be used when mapping internal tx. */ - private final ThreadLocal<IgniteInternalTx> txTopology = new ThreadLocal<>(); + private final ThreadLocal<AffinityTopologyVersion> txTop = new ThreadLocal<>(); /** Per-thread transaction map. */ private final ConcurrentMap<Long, IgniteInternalTx> threadMap = newMap(); @@ -130,7 +130,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { private final ConcurrentMap<GridCacheVersion, IgniteInternalTx> nearIdMap = newMap(); /** TX handler. */ - private IgniteTxHandler txHandler; + private IgniteTxHandler txHnd; /** Committed local transactions. */ private final GridBoundedConcurrentOrderedMap<GridCacheVersion, Boolean> completedVersSorted = @@ -197,7 +197,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { @Override protected void start0() throws IgniteCheckedException { txFinishSync = new GridCacheTxFinishSync<>(cctx); - txHandler = new IgniteTxHandler(cctx); + txHnd = new IgniteTxHandler(cctx); } /** {@inheritDoc} */ @@ -212,7 +212,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @return TX handler. */ public IgniteTxHandler txHandler() { - return txHandler; + return txHnd; } /** @@ -609,11 +609,15 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @param ignore Transaction to ignore. * @return Any transaction associated with the current thread. */ - public IgniteInternalTx anyActiveThreadTx(long threadId, IgniteInternalTx ignore) { + public AffinityTopologyVersion anyActiveThreadTx(long threadId, IgniteInternalTx ignore) { IgniteInternalTx tx = threadMap.get(threadId); - if (tx != null && tx.topologyVersionSnapshot() != null) - return tx; + if (tx != null) { + AffinityTopologyVersion topVer = tx.topologyVersionSnapshot(); + + if (topVer != null) + return topVer; + } for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) { if (!cacheCtx.systemTx()) @@ -621,23 +625,27 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { tx = sysThreadMap.get(new TxThreadKey(threadId, cacheCtx.cacheId())); - if (tx != null && tx != ignore && tx.topologyVersionSnapshot() != null) - return tx; + if (tx != null && tx != ignore) { + AffinityTopologyVersion topVer = tx.topologyVersionSnapshot(); + + if (topVer != null) + return topVer; + } } - return txTopology.get(); + return txTop.get(); } /** - * @param tx Transaction. + * @param topVer Locked topology version. * @return {@code True} if topology hint was set. */ - public boolean setTxTopologyHint(@Nullable IgniteInternalTx tx) { - if (tx == null) - txTopology.remove(); + public boolean setTxTopologyHint(@Nullable AffinityTopologyVersion topVer) { + if (topVer == null) + txTop.remove(); else { - if (txTopology.get() == null) { - txTopology.set(tx); + if (txTop.get() == null) { + txTop.set(topVer); return true; } @@ -1762,7 +1770,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } if (commit) - tx.commitAsync(true).listen(new CommitListener(tx)); + tx.commitAsync().listen(new CommitListener(tx)); else tx.rollbackAsync(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java index 0b4238e..e88ae6f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java @@ -116,7 +116,7 @@ public class IgniteBinaryMetadataUpdateNodeRestartTest extends GridCommonAbstrac * @throws Exception If failed. */ public void testNodeRestart() throws Exception { - for (int i = 0; i < 5; i++) { + for (int i = 0; i < 10; i++) { log.info("Iteration: " + i); client = false;
