Repository: ignite
Updated Branches:
  refs/heads/ignite-1537 7fd645373 -> 215ff1eb6


ignite-1.5 Fixed hang on metadata update inside put in atomic cache when 
topology read lock is held.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/215ff1eb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/215ff1eb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/215ff1eb

Branch: refs/heads/ignite-1537
Commit: 215ff1eb63ba71f553956c24b51b3fbb067c038e
Parents: 7fd6453
Author: sboikov <[email protected]>
Authored: Tue Dec 22 11:32:04 2015 +0300
Committer: sboikov <[email protected]>
Committed: Tue Dec 22 11:32:04 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 22 ++---
 .../processors/cache/GridCacheProxyImpl.java    |  7 +-
 .../cache/GridCacheSharedContext.java           |  9 +-
 .../processors/cache/IgniteCacheProxy.java      | 11 ++-
 .../processors/cache/IgniteInternalCache.java   |  8 +-
 .../binary/CacheObjectBinaryProcessorImpl.java  | 19 ++--
 .../GridDistributedTxRemoteAdapter.java         |  2 +-
 .../distributed/dht/GridDhtLockFuture.java      |  2 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |  2 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |  2 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |  2 +-
 .../dht/atomic/GridDhtAtomicCache.java          | 91 +++++++++++---------
 .../colocated/GridDhtColocatedLockFuture.java   | 10 +--
 .../distributed/near/GridNearLockFuture.java    | 10 +--
 ...arOptimisticSerializableTxPrepareFuture.java |  4 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |  4 +-
 ...ridNearOptimisticTxPrepareFutureAdapter.java | 30 ++-----
 .../GridNearPessimisticTxPrepareFuture.java     |  4 +-
 .../cache/distributed/near/GridNearTxLocal.java | 14 +--
 .../near/GridNearTxPrepareFutureAdapter.java    |  4 +-
 .../cache/transactions/IgniteInternalTx.java    |  3 +-
 .../cache/transactions/IgniteTxAdapter.java     |  2 +-
 .../cache/transactions/IgniteTxHandler.java     |  2 +-
 .../transactions/IgniteTxLocalAdapter.java      | 20 ++---
 .../cache/transactions/IgniteTxLocalEx.java     |  1 -
 .../cache/transactions/IgniteTxManager.java     | 42 +++++----
 ...niteBinaryMetadataUpdateNodeRestartTest.java |  2 +-
 27 files changed, 149 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 1d097b7..5d4c386 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2077,21 +2077,22 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public <T> EntryProcessorResult<T> tryInvoke(K key,
+    @Nullable @Override public <T> EntryProcessorResult<T> invoke(@Nullable 
AffinityTopologyVersion topVer,
+        K key,
         EntryProcessor<K, V, T> entryProcessor,
         Object... args) throws IgniteCheckedException {
-        return invoke0(false, key, entryProcessor, args);
+        return invoke0(topVer, key, entryProcessor, args);
     }
 
     /** {@inheritDoc} */
     @Override public <T> EntryProcessorResult<T> invoke(final K key,
         final EntryProcessor<K, V, T> entryProcessor,
         final Object... args) throws IgniteCheckedException {
-        return invoke0(true, key, entryProcessor, args);
+        return invoke0(null, key, entryProcessor, args);
     }
 
     /**
-     * @param waitTopFut If {@code false} does not wait for affinity change 
future.
+     * @param topVer Locked topology version.
      * @param key Key.
      * @param entryProcessor Entry processor.
      * @param args Entry processor arguments.
@@ -2099,7 +2100,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
      * @throws IgniteCheckedException If failed.
      */
     private <T> EntryProcessorResult<T> invoke0(
-        final boolean waitTopFut,
+        @Nullable final AffinityTopologyVersion topVer,
         final K key,
         final EntryProcessor<K, V, T> entryProcessor,
         final Object... args)
@@ -2112,13 +2113,12 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         return syncOp(new SyncOp<EntryProcessorResult<T>>(true) {
             @Nullable @Override public EntryProcessorResult<T> 
op(IgniteTxLocalAdapter tx)
                 throws IgniteCheckedException {
-                assert !waitTopFut || tx.implicit();
+                assert topVer == null || tx.implicit();
 
-                if (!waitTopFut)
-                    
tx.topologyVersion(ctx.shared().exchange().readyAffinityVersion());
+                if (topVer != null)
+                    tx.topologyVersion(topVer);
 
                 IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx,
-                    waitTopFut,
                     key,
                     (EntryProcessor<K, V, Object>)entryProcessor,
                     args);
@@ -3992,7 +3992,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                 IgniteInternalFuture<IgniteInternalTx> f = new 
GridEmbeddedFuture<>(fut,
                     new C2<Object, Exception, 
IgniteInternalFuture<IgniteInternalTx>>() {
                         @Override public 
IgniteInternalFuture<IgniteInternalTx> apply(Object o, Exception e) {
-                            return tx.commitAsync(true);
+                            return tx.commitAsync();
                         }
                     });
 
@@ -4001,7 +4001,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                 return f;
             }
 
-            IgniteInternalFuture<IgniteInternalTx> f = tx.commitAsync(true);
+            IgniteInternalFuture<IgniteInternalTx> f = tx.commitAsync();
 
             saveFuture(holder, f);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index cbdb5b4..8ffd273 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -36,6 +36,7 @@ import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityProxy;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -1243,13 +1244,15 @@ public class GridCacheProxyImpl<K, V> implements 
IgniteInternalCache<K, V>, Exte
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public <T> EntryProcessorResult<T> tryInvoke(K key,
+    @Nullable @Override public <T> EntryProcessorResult<T> invoke(
+        AffinityTopologyVersion topVer,
+        K key,
         EntryProcessor<K, V, T> entryProcessor,
         Object... args) throws IgniteCheckedException {
         CacheOperationContext prev = gate.enter(opCtx);
 
         try {
-            return delegate.tryInvoke(key, entryProcessor, args);
+            return delegate.invoke(topVer, key, entryProcessor, args);
         }
         finally {
             gate.leave(prev);

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 0a03494..f52e378 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -575,12 +575,7 @@ public class GridCacheSharedContext<K, V> {
     @Nullable public AffinityTopologyVersion 
lockedTopologyVersion(IgniteInternalTx ignore) {
         long threadId = Thread.currentThread().getId();
 
-        IgniteInternalTx tx = txMgr.anyActiveThreadTx(threadId, ignore);
-
-        AffinityTopologyVersion topVer = null;
-
-        if (tx != null && tx.topologyVersionSnapshot() != null)
-            topVer = tx.topologyVersionSnapshot();
+        AffinityTopologyVersion topVer = txMgr.anyActiveThreadTx(threadId, 
ignore);
 
         if (topVer == null)
             topVer = mvccMgr.lastExplicitLockTopologyVersion(threadId);
@@ -618,7 +613,7 @@ public class GridCacheSharedContext<K, V> {
         if (ctx == null) {
             tx.txState().awaitLastFut(this);
 
-            return tx.commitAsync(true);
+            return tx.commitAsync();
         }
         else
             return ctx.cache().commitTxAsync(tx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 12b8e92..79d04ef 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.AsyncSupportAdapter;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.query.CacheQuery;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
@@ -1485,14 +1486,16 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /**
-     * Tries to execute invoke operation. Fails if topology exchange is in 
progress.
-     *
+     * @param topVer Locked topology version.
      * @param key Key.
      * @param entryProcessor Entry processor.
      * @param args Arguments.
      * @return Invoke result.
      */
-    public <T> T tryInvoke(K key, EntryProcessor<K, V, T> entryProcessor, 
Object... args) {
+    public <T> T invoke(@Nullable AffinityTopologyVersion topVer,
+        K key,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args) {
         try {
             GridCacheGateway<K, V> gate = this.gate;
 
@@ -1502,7 +1505,7 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
                 if (isAsync())
                     throw new UnsupportedOperationException();
                 else {
-                    EntryProcessorResult<T> res = delegate.tryInvoke(key, 
entryProcessor, args);
+                    EntryProcessorResult<T> res = delegate.invoke(topVer, key, 
entryProcessor, args);
 
                     return res != null ? res.get() : null;
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index fcba9c4..433290c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -40,6 +40,7 @@ import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -1876,15 +1877,16 @@ public interface IgniteInternalCache<K, V> extends 
Iterable<Cache.Entry<K, V>> {
     @Nullable public V tryPutIfAbsent(K key, V val) throws 
IgniteCheckedException;
 
     /**
-     * Tries to execute invoke operation. Will fail if topology exchange is in 
progress.
-     *
+     * @param topVer Locked topology version.
      * @param key Key.
      * @param entryProcessor Entry processor.
      * @param args Arguments.
      * @return Invoke result.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable public <T> EntryProcessorResult<T> tryInvoke(K key,
+    @Nullable public <T> EntryProcessorResult<T> invoke(
+        @Nullable AffinityTopologyVersion topVer,
+        K key,
         EntryProcessor<K, V, T> entryProcessor,
         Object... args) throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 36558e7..91d60bb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -286,7 +286,7 @@ public class CacheObjectBinaryProcessorImpl extends 
IgniteCacheObjectProcessorIm
         }
 
         for (Map.Entry<Integer, BinaryMetadata> e : metaBuf.entrySet())
-            addMeta(e.getKey(), e.getValue().wrap(binaryCtx), false);
+            addMeta(e.getKey(), e.getValue().wrap(binaryCtx));
 
         metaBuf.clear();
 
@@ -474,16 +474,7 @@ public class CacheObjectBinaryProcessorImpl extends 
IgniteCacheObjectProcessorIm
     }
 
     /** {@inheritDoc} */
-    @Override public void addMeta(final int typeId, final BinaryType newMeta) 
throws BinaryObjectException {
-        addMeta(typeId, newMeta, true);
-    }
-
-    /**
-     * @param typeId Type ID.
-     * @param newMeta New meta data.
-     * @param tryInvoke If {@code true} uses {@link 
IgniteCacheProxy#tryInvoke} for metadata update.
-     */
-    public void addMeta(final int typeId, final BinaryType newMeta, boolean 
tryInvoke) {
+    public void addMeta(final int typeId, final BinaryType newMeta) {
         assert newMeta != null;
         assert newMeta instanceof BinaryTypeImpl;
 
@@ -495,9 +486,9 @@ public class CacheObjectBinaryProcessorImpl extends 
IgniteCacheObjectProcessorIm
             BinaryMetadata oldMeta = metaDataCache.localPeek(key);
             BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, 
newMeta0);
 
-            BinaryObjectException err = tryInvoke ?
-                metaDataCache.tryInvoke(key, new 
MetadataProcessor(mergedMeta)) :
-                metaDataCache.invoke(key, new MetadataProcessor(mergedMeta));
+            AffinityTopologyVersion topVer = 
ctx.cache().context().lockedTopologyVersion(null);
+
+            BinaryObjectException err = metaDataCache.invoke(topVer, key, new 
MetadataProcessor(mergedMeta));
 
             if (err != null)
                 throw err;

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 56dc684..1fd0b2e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -728,7 +728,7 @@ public class GridDistributedTxRemoteAdapter extends 
IgniteTxAdapter
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<IgniteInternalTx> 
commitAsync(boolean waitTopFut) {
+    @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() {
         try {
             commit();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index f0d2e15..98711b8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -743,7 +743,7 @@ public final class GridDhtLockFuture extends 
GridCompoundIdentityFuture<Boolean>
         if (tx != null) {
             cctx.tm().txContext(tx);
 
-            set = cctx.tm().setTxTopologyHint(tx);
+            set = cctx.tm().setTxTopologyHint(tx.topologyVersionSnapshot());
         }
 
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index b7bca06..ae24ed1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -950,7 +950,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, 
V> extends GridDhtCach
                             if (resp.error() == null && t.onePhaseCommit()) {
                                 assert t.implicit();
 
-                                return t.commitAsync(true).chain(
+                                return t.commitAsync().chain(
                                     new 
C1<IgniteInternalFuture<IgniteInternalTx>, GridNearLockResponse>() {
                                         @Override public GridNearLockResponse 
apply(IgniteInternalFuture<IgniteInternalTx> f) {
                                             try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 621281c..f344d48 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -480,7 +480,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter 
implements GridCacheMa
 
     /** {@inheritDoc} */
     @SuppressWarnings({"ThrowableInstanceNeverThrown"})
-    @Override public IgniteInternalFuture<IgniteInternalTx> 
commitAsync(boolean waitTopFut) {
+    @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() {
         if (log.isDebugEnabled())
             log.debug("Committing dht local tx: " + this);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 334cee7..40399b4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -643,7 +643,7 @@ public final class GridDhtTxPrepareFuture extends 
GridCompoundFuture<IgniteInter
 
                     if (prepErr == null) {
                         try {
-                            fut = tx.commitAsync(true);
+                            fut = tx.commitAsync();
                         }
                         catch (RuntimeException | Error e) {
                             Exception hEx = new 
IgniteTxHeuristicCheckedException("Commit produced a runtime " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 8cb5249..3c8b7d4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -77,6 +77,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing
 import 
org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
+import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
@@ -1240,7 +1241,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                 top.readLock();
 
                 try {
-                    if (topology().stopping()) {
+                    if (top.stopping()) {
                         res.addFailedKeys(keys, new 
IgniteCheckedException("Failed to perform cache operation " +
                             "(cache is stopped): " + name()));
 
@@ -1289,48 +1290,58 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
                         GridCacheReturn retVal = null;
 
-                        if (keys.size() > 1 &&                             // 
Several keys ...
-                            writeThrough() && !req.skipStore() &&          // 
and store is enabled ...
-                            !ctx.store().isLocal() &&                      // 
and this is not local store ...
-                            !ctx.dr().receiveEnabled()                     // 
and no DR.
-                        ) {
-                            // This method can only be used when there are no 
replicated entries in the batch.
-                            UpdateBatchResult updRes = updateWithBatch(node,
-                                hasNear,
-                                req,
-                                res,
-                                locked,
-                                ver,
-                                dhtFut,
-                                completionCb,
-                                ctx.isDrEnabled(),
-                                taskName,
-                                expiry,
-                                sndPrevVal);
+                        IgniteTxManager tm = ctx.tm();
 
-                            deleted = updRes.deleted();
-                            dhtFut = updRes.dhtFuture();
+                        boolean set = 
tm.setTxTopologyHint(req.topologyVersion());
 
-                            if (req.operation() == TRANSFORM)
-                                retVal = updRes.invokeResults();
+                        try {
+                            if (keys.size() > 1 &&                             
// Several keys ...
+                                writeThrough() && !req.skipStore() &&          
// and store is enabled ...
+                                !ctx.store().isLocal() &&                      
// and this is not local store ...
+                                !ctx.dr().receiveEnabled()                     
// and no DR.
+                                ) {
+                                // This method can only be used when there are 
no replicated entries in the batch.
+                                UpdateBatchResult updRes = 
updateWithBatch(node,
+                                    hasNear,
+                                    req,
+                                    res,
+                                    locked,
+                                    ver,
+                                    dhtFut,
+                                    completionCb,
+                                    ctx.isDrEnabled(),
+                                    taskName,
+                                    expiry,
+                                    sndPrevVal);
+
+                                deleted = updRes.deleted();
+                                dhtFut = updRes.dhtFuture();
+
+                                if (req.operation() == TRANSFORM)
+                                    retVal = updRes.invokeResults();
+                            }
+                            else {
+                                UpdateSingleResult updRes = updateSingle(node,
+                                    hasNear,
+                                    req,
+                                    res,
+                                    locked,
+                                    ver,
+                                    dhtFut,
+                                    completionCb,
+                                    ctx.isDrEnabled(),
+                                    taskName,
+                                    expiry,
+                                    sndPrevVal);
+
+                                retVal = updRes.returnValue();
+                                deleted = updRes.deleted();
+                                dhtFut = updRes.dhtFuture();
+                            }
                         }
-                        else {
-                            UpdateSingleResult updRes = updateSingle(node,
-                                hasNear,
-                                req,
-                                res,
-                                locked,
-                                ver,
-                                dhtFut,
-                                completionCb,
-                                ctx.isDrEnabled(),
-                                taskName,
-                                expiry,
-                                sndPrevVal);
-
-                            retVal = updRes.returnValue();
-                            deleted = updRes.deleted();
-                            dhtFut = updRes.dhtFuture();
+                        finally {
+                            if (set)
+                                tm.setTxTopologyHint(null);
                         }
 
                         if (retVal == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 22b329c..7fba9bc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -596,12 +596,8 @@ public final class GridDhtColocatedLockFuture extends 
GridCompoundIdentityFuture
         AffinityTopologyVersion topVer = 
cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
 
         // If there is another system transaction in progress, use it's 
topology version to prevent deadlock.
-        if (topVer == null && tx != null && tx.system()) {
-            IgniteInternalTx tx0 = 
cctx.tm().anyActiveThreadTx(Thread.currentThread().getId(), tx);
-
-            if (tx0 != null)
-                topVer = tx0.topologyVersionSnapshot();
-        }
+        if (topVer == null && tx != null && tx.system())
+            topVer = 
cctx.tm().anyActiveThreadTx(Thread.currentThread().getId(), tx);
 
         if (topVer != null && tx != null)
             tx.topologyVersion(topVer);
@@ -980,7 +976,7 @@ public final class GridDhtColocatedLockFuture extends 
GridCompoundIdentityFuture
      * @throws IgniteCheckedException If failed.
      */
     private void proceedMapping() throws IgniteCheckedException {
-        boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx);
+        boolean set = tx != null && 
cctx.shared().tm().setTxTopologyHint(tx.topologyVersionSnapshot());
 
         try {
             proceedMapping0();

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 23e0f6b..413f5d8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -723,12 +723,8 @@ public final class GridNearLockFuture extends 
GridCompoundIdentityFuture<Boolean
         AffinityTopologyVersion topVer = 
cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
 
         // If there is another system transaction in progress, use it's 
topology version to prevent deadlock.
-        if (topVer == null && tx != null && tx.system()) {
-            IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(threadId, tx);
-
-            if (tx0 != null)
-                topVer = tx0.topologyVersionSnapshot();
-        }
+        if (topVer == null && tx != null && tx.system())
+            topVer = cctx.tm().anyActiveThreadTx(threadId, tx);
 
         if (topVer != null && tx != null)
             tx.topologyVersion(topVer);
@@ -1098,7 +1094,7 @@ public final class GridNearLockFuture extends 
GridCompoundIdentityFuture<Boolean
      * @throws IgniteCheckedException If failed.
      */
     private void proceedMapping() throws IgniteCheckedException {
-        boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx);
+        boolean set = tx != null && 
cctx.shared().tm().setTxTopologyHint(tx.topologyVersionSnapshot());
 
         try {
             proceedMapping0();

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index afc2d6d..37dc564 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -305,7 +305,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearOptim
             return;
         }
 
-        boolean set = cctx.tm().setTxTopologyHint(tx);
+        boolean set = 
cctx.tm().setTxTopologyHint(tx.topologyVersionSnapshot());
 
         try {
             prepare(tx.readEntries(), tx.writeEntries(), remap, topLocked);
@@ -857,7 +857,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearOptim
          * @param res Response.
          */
         private void remap(final GridNearTxPrepareResponse res) {
-            prepareOnTopology(true, true, new Runnable() {
+            prepareOnTopology(true, new Runnable() {
                 @Override public void run() {
                     onDone(res);
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 773259e..a9f158a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -406,7 +406,7 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepa
         if (isDone())
             return;
 
-        boolean set = cctx.tm().setTxTopologyHint(tx);
+        boolean set = 
cctx.tm().setTxTopologyHint(tx.topologyVersionSnapshot());
 
         try {
             assert !m.empty();
@@ -749,7 +749,7 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepa
          *
          */
         private void remap() {
-            prepareOnTopology(true, true, new Runnable() {
+            prepareOnTopology(true, new Runnable() {
                 @Override public void run() {
                     onDone((GridNearTxPrepareResponse)null);
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index b6d4342..f29eda2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -20,11 +20,9 @@ package 
org.apache.ignite.internal.processors.cache.distributed.near;
 import java.util.Collection;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -50,19 +48,15 @@ public abstract class 
GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
     }
 
     /** {@inheritDoc} */
-    @Override public final void prepare(boolean waitTopFut) {
+    @Override public final void prepare() {
         // Obtain the topology version to use.
         long threadId = Thread.currentThread().getId();
 
         AffinityTopologyVersion topVer = 
cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
 
         // If there is another system transaction in progress, use it's 
topology version to prevent deadlock.
-        if (topVer == null && tx != null && tx.system()) {
-            IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(threadId, tx);
-
-            if (tx0 != null)
-                topVer = tx0.topologyVersionSnapshot();
-        }
+        if (topVer == null && tx != null && tx.system())
+            topVer = cctx.tm().anyActiveThreadTx(threadId, tx);
 
         if (topVer != null) {
             tx.topologyVersion(topVer);
@@ -74,7 +68,7 @@ public abstract class 
GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
             return;
         }
 
-        prepareOnTopology(waitTopFut, false, null);
+        prepareOnTopology(false, null);
     }
 
     /**
@@ -94,11 +88,10 @@ public abstract class 
GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
     }
 
     /**
-     * @param waitTopFut If {@code false} does not wait for affinity change 
future.
      * @param remap Remap flag.
      * @param c Optional closure to run after map.
      */
-    protected final void prepareOnTopology(boolean waitTopFut, final boolean 
remap, @Nullable final Runnable c) {
+    protected final void prepareOnTopology(final boolean remap, @Nullable 
final Runnable c) {
         GridDhtTopologyFuture topFut = topologyReadLock();
 
         AffinityTopologyVersion topVer = null;
@@ -141,17 +134,6 @@ public abstract class 
GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
                 c.run();
         }
         else {
-            if (!waitTopFut) {
-                ClusterTopologyCheckedException err = new 
ClusterTopologyCheckedException("Failed to execute update, " +
-                    "cluster topology changed.");
-
-                err.retryReadyFuture(topFut);
-
-                onDone(err);
-
-                return;
-            }
-
             topFut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                 @Override public void apply(final 
IgniteInternalFuture<AffinityTopologyVersion> fut) {
                     cctx.kernalContext().closure().runLocalSafe(new 
GridPlainRunnable() {
@@ -159,7 +141,7 @@ public abstract class 
GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
                             try {
                                 fut.get();
 
-                                prepareOnTopology(true, remap, c);
+                                prepareOnTopology(remap, c);
                             }
                             catch (IgniteCheckedException e) {
                                 onDone(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 691a2a8..ffe5373 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -135,9 +135,7 @@ public class GridNearPessimisticTxPrepareFuture extends 
GridNearTxPrepareFutureA
     }
 
     /** {@inheritDoc} */
-    @Override public void prepare(boolean waitTopFut) {
-        assert waitTopFut;
-
+    @Override public void prepare() {
         if (!tx.state(PREPARING)) {
             if (tx.setRollbackOnly()) {
                 if (tx.timedOut())

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 3ee2981..ae4972e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -784,14 +784,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter 
{
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> prepareAsync() {
-        return prepareAsync0(true);
-    }
-
-    /**
-     * @param waitTopFut If {@code false} does not wait for affinity change 
future.
-     * @return Prepare future.
-     */
-    private IgniteInternalFuture<?> prepareAsync0(boolean waitTopFut) {
         GridNearTxPrepareFutureAdapter fut = 
(GridNearTxPrepareFutureAdapter)prepFut.get();
 
         if (fut == null) {
@@ -813,18 +805,18 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter {
 
         mapExplicitLocks();
 
-        fut.prepare(waitTopFut);
+        fut.prepare();
 
         return fut;
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings({"ThrowableInstanceNeverThrown"})
-    @Override public IgniteInternalFuture<IgniteInternalTx> 
commitAsync(boolean waitTopFut) {
+    @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() {
         if (log.isDebugEnabled())
             log.debug("Committing near local tx: " + this);
 
-        prepareAsync0(waitTopFut);
+        prepareAsync();
 
         GridNearTxFinishFuture fut = commitFut.get();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index a587687..52cad91 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -130,10 +130,8 @@ public abstract class GridNearTxPrepareFutureAdapter 
extends
 
     /**
      * Prepares transaction.
-     *
-     * @param waitTopFut If {@code false} does not wait for affinity change 
future.
      */
-    public abstract void prepare(boolean waitTopFut);
+    public abstract void prepare();
 
     /**
      * @param nodeId Sender.

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 7d75b2c..f5f99f5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -619,10 +619,9 @@ public interface IgniteInternalTx extends AutoCloseable, 
GridTimeoutObject {
     /**
      * Asynchronously commits this transaction by initiating {@code 
two-phase-commit} process.
      *
-     * @param waitTopFut If {@code false} does not wait for affinity change 
future.
      * @return Future for commit operation.
      */
-    public IgniteInternalFuture<IgniteInternalTx> commitAsync(boolean 
waitTopFut);
+    public IgniteInternalFuture<IgniteInternalTx> commitAsync();
 
     /**
      * Callback invoked whenever there is a lock that has been acquired

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index f2ada64..53f4f56 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -2063,7 +2063,7 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter
         }
 
         /** {@inheritDoc} */
-        @Override public IgniteInternalFuture<IgniteInternalTx> 
commitAsync(boolean waitTopFut) {
+        @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() {
             return null;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 81c8a3c..b25baf8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -691,7 +691,7 @@ public class IgniteTxHandler {
                 tx.nearFinishFutureId(req.futureId());
                 tx.nearFinishMiniId(req.miniId());
 
-                IgniteInternalFuture<IgniteInternalTx> commitFut = 
tx.commitAsync(true);
+                IgniteInternalFuture<IgniteInternalTx> commitFut = 
tx.commitAsync();
 
                 // Only for error logging.
                 commitFut.listen(CU.errorLogger(log));

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 08e564e..70c79a5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -580,7 +580,7 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
     /** {@inheritDoc} */
     @Override public void commit() throws IgniteCheckedException {
         try {
-            commitAsync(true).get();
+            commitAsync().get();
         }
         finally {
             cctx.tm().resetContext();
@@ -1953,16 +1953,15 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
         V val,
         boolean retval,
         CacheEntryPredicate[] filter) {
-        return putAsync0(cacheCtx, true, key, val, null, null, retval, filter);
+        return putAsync0(cacheCtx, key, val, null, null, retval, filter);
     }
 
     /** {@inheritDoc} */
     @Override public <K, V> IgniteInternalFuture<GridCacheReturn> 
invokeAsync(GridCacheContext cacheCtx,
-        boolean waitTopFut,
         K key,
         EntryProcessor<K, V, Object> entryProcessor,
         Object... invokeArgs) {
-        return (IgniteInternalFuture)putAsync0(cacheCtx, waitTopFut, key, 
null, entryProcessor, invokeArgs, true, null);
+        return (IgniteInternalFuture)putAsync0(cacheCtx, key, null, 
entryProcessor, invokeArgs, true, null);
     }
 
     /** {@inheritDoc} */
@@ -2915,7 +2914,6 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
      */
     private <K, V> IgniteInternalFuture putAsync0(
         final GridCacheContext cacheCtx,
-        boolean waitTopFut,
         K key,
         @Nullable V val,
         @Nullable EntryProcessor<K, V, Object> entryProcessor,
@@ -3016,7 +3014,7 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
                 }
             }
             else
-                return optimisticPutFuture(cacheCtx, waitTopFut, loadFut, ret, 
keepBinary);
+                return optimisticPutFuture(cacheCtx, loadFut, ret, keepBinary);
         }
         catch (IgniteCheckedException e) {
             return new GridFinishedFuture(e);
@@ -3195,7 +3193,7 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
                 }
             }
             else
-                return optimisticPutFuture(cacheCtx, true, loadFut, ret, 
keepBinary);
+                return optimisticPutFuture(cacheCtx, loadFut, ret, keepBinary);
         }
         catch (RuntimeException e) {
             onException();
@@ -3206,7 +3204,6 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
 
     /**
      * @param cacheCtx Cache context.
-     * @param waitTopFut If {@code false} does not wait for affinity change 
future.
      * @param loadFut Missing keys load future.
      * @param ret Future result.
      * @param keepBinary Keep binary flag.
@@ -3214,7 +3211,6 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
      */
     private IgniteInternalFuture optimisticPutFuture(
         final GridCacheContext cacheCtx,
-        boolean waitTopFut,
         IgniteInternalFuture<Void> loadFut,
         final GridCacheReturn ret,
         final boolean keepBinary
@@ -3231,7 +3227,7 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
                 return new GridFinishedFuture<>(e);
             }
 
-            return nonInterruptable(commitAsync(waitTopFut).chain(
+            return nonInterruptable(commitAsync().chain(
                 new CX1<IgniteInternalFuture<IgniteInternalTx>, 
GridCacheReturn>() {
                     @Override public GridCacheReturn 
applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
                         throws IgniteCheckedException {
@@ -3472,7 +3468,7 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
                 // with prepare response, if required.
                 assert loadFut.isDone();
 
-                return nonInterruptable(commitAsync(true).chain(new 
CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
+                return nonInterruptable(commitAsync().chain(new 
CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
                     @Override public GridCacheReturn 
applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
                         throws IgniteCheckedException {
                         try {
@@ -3967,7 +3963,7 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
                 if (commit && commitAfterLock()) {
                     rollback = false;
 
-                    return commitAsync(true).chain(new 
CX1<IgniteInternalFuture<IgniteInternalTx>, T>() {
+                    return commitAsync().chain(new 
CX1<IgniteInternalFuture<IgniteInternalTx>, T>() {
                         @Override public T 
applyx(IgniteInternalFuture<IgniteInternalTx> f) throws IgniteCheckedException {
                             f.get();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index e1b73cc..a5d3373 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -115,7 +115,6 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
      */
     public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(
         GridCacheContext cacheCtx,
-        boolean waitTopFut,
         K key,
         EntryProcessor<K, V, Object> entryProcessor,
         Object... invokeArgs);

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 205be49..0471443 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -115,7 +115,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     private final ThreadLocal<IgniteInternalTx> threadCtx = new 
ThreadLocal<>();
 
     /** Transaction which topology version should be used when mapping 
internal tx. */
-    private final ThreadLocal<IgniteInternalTx> txTopology = new 
ThreadLocal<>();
+    private final ThreadLocal<AffinityTopologyVersion> txTop = new 
ThreadLocal<>();
 
     /** Per-thread transaction map. */
     private final ConcurrentMap<Long, IgniteInternalTx> threadMap = newMap();
@@ -130,7 +130,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     private final ConcurrentMap<GridCacheVersion, IgniteInternalTx> nearIdMap 
= newMap();
 
     /** TX handler. */
-    private IgniteTxHandler txHandler;
+    private IgniteTxHandler txHnd;
 
     /** Committed local transactions. */
     private final GridBoundedConcurrentOrderedMap<GridCacheVersion, Boolean> 
completedVersSorted =
@@ -197,7 +197,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     @Override protected void start0() throws IgniteCheckedException {
         txFinishSync = new GridCacheTxFinishSync<>(cctx);
 
-        txHandler = new IgniteTxHandler(cctx);
+        txHnd = new IgniteTxHandler(cctx);
     }
 
     /** {@inheritDoc} */
@@ -212,7 +212,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
      * @return TX handler.
      */
     public IgniteTxHandler txHandler() {
-        return txHandler;
+        return txHnd;
     }
 
     /**
@@ -609,11 +609,15 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
      * @param ignore Transaction to ignore.
      * @return Any transaction associated with the current thread.
      */
-    public IgniteInternalTx anyActiveThreadTx(long threadId, IgniteInternalTx 
ignore) {
+    public AffinityTopologyVersion anyActiveThreadTx(long threadId, 
IgniteInternalTx ignore) {
         IgniteInternalTx tx = threadMap.get(threadId);
 
-        if (tx != null && tx.topologyVersionSnapshot() != null)
-            return tx;
+        if (tx != null) {
+            AffinityTopologyVersion topVer = tx.topologyVersionSnapshot();
+
+            if (topVer != null)
+                return topVer;
+        }
 
         for (GridCacheContext cacheCtx : 
cctx.cache().context().cacheContexts()) {
             if (!cacheCtx.systemTx())
@@ -621,23 +625,27 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
 
             tx = sysThreadMap.get(new TxThreadKey(threadId, 
cacheCtx.cacheId()));
 
-            if (tx != null && tx != ignore && tx.topologyVersionSnapshot() != 
null)
-                return tx;
+            if (tx != null && tx != ignore) {
+                AffinityTopologyVersion topVer = tx.topologyVersionSnapshot();
+
+                if (topVer != null)
+                    return topVer;
+            }
         }
 
-        return txTopology.get();
+        return txTop.get();
     }
 
     /**
-     * @param tx Transaction.
+     * @param topVer Locked topology version.
      * @return {@code True} if topology hint was set.
      */
-    public boolean setTxTopologyHint(@Nullable IgniteInternalTx tx) {
-        if (tx == null)
-            txTopology.remove();
+    public boolean setTxTopologyHint(@Nullable AffinityTopologyVersion topVer) 
{
+        if (topVer == null)
+            txTop.remove();
         else {
-            if (txTopology.get() == null) {
-                txTopology.set(tx);
+            if (txTop.get() == null) {
+                txTop.set(topVer);
 
                 return true;
             }
@@ -1762,7 +1770,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
         }
 
         if (commit)
-            tx.commitAsync(true).listen(new CommitListener(tx));
+            tx.commitAsync().listen(new CommitListener(tx));
         else
             tx.rollbackAsync();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/215ff1eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
index 0b4238e..e88ae6f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
@@ -116,7 +116,7 @@ public class IgniteBinaryMetadataUpdateNodeRestartTest 
extends GridCommonAbstrac
      * @throws Exception If failed.
      */
     public void testNodeRestart() throws Exception {
-        for (int i = 0; i < 5; i++) {
+        for (int i = 0; i < 10; i++) {
             log.info("Iteration: " + i);
 
             client = false;

Reply via email to