Repository: ignite
Updated Branches:
  refs/heads/ignite-1607 fa7e5d2f2 -> cec7529bf


ignite-1607 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cec7529b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cec7529b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cec7529b

Branch: refs/heads/ignite-1607
Commit: cec7529bfb06cfbb3579c9fcefe2e88a6c3fc3ca
Parents: fa7e5d2
Author: sboikov <[email protected]>
Authored: Wed Oct 21 15:32:21 2015 +0300
Committer: sboikov <[email protected]>
Committed: Wed Oct 21 15:32:21 2015 +0300

----------------------------------------------------------------------
 ...arOptimisticSerializableTxPrepareFuture.java | 196 ++--------------
 .../near/GridNearOptimisticTxPrepareFuture.java | 198 +----------------
 ...ridNearOptimisticTxPrepareFutureAdapter.java | 222 +++++++++++++++++++
 3 files changed, 248 insertions(+), 368 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cec7529b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 5f41cdf..8e5bac2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -76,7 +76,7 @@ import static 
org.apache.ignite.transactions.TransactionState.PREPARING;
 /**
  *
  */
-public class GridNearOptimisticSerializableTxPrepareFuture extends 
GridNearTxPrepareFutureAdapter
+public class GridNearOptimisticSerializableTxPrepareFuture extends 
GridNearOptimisticTxPrepareFutureAdapter
     implements GridCacheMvccFuture<IgniteInternalTx> {
     /** */
     public static final IgniteProductVersion SER_TX_SINCE = 
IgniteProductVersion.fromString("1.5.0");
@@ -155,12 +155,12 @@ public class 
GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
         return found;
     }
 
-    /*
+    /**
      * @param m Failed mapping.
      * @param e Error.
      * @param res Response.
      */
-    void onError(@Nullable GridDistributedTxMapping m, Throwable e, 
GridNearTxPrepareResponse res) {
+    private void onError(@Nullable GridDistributedTxMapping m, Throwable e, 
GridNearTxPrepareResponse res) {
         if (X.hasCause(e, ClusterTopologyCheckedException.class) || 
X.hasCause(e, ClusterTopologyException.class)) {
             if (tx.onePhaseCommit()) {
                 tx.markForBackupCheck();
@@ -283,183 +283,12 @@ public class 
GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
         return false;
     }
 
-    /** {@inheritDoc} */
-    @Override public void prepare() {
-        // Obtain the topology version to use.
-        AffinityTopologyVersion topVer = 
cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
-
-        // If there is another system transaction in progress, use it's 
topology version to prevent deadlock.
-        if (topVer == null && tx != null && tx.system()) {
-            IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
-
-            if (tx0 != null)
-                topVer = tx0.topologyVersionSnapshot();
-        }
-
-        if (topVer != null) {
-            tx.topologyVersion(topVer);
-
-            cctx.mvcc().addFuture(this);
-
-            prepare0(false);
-
-            return;
-        }
-
-        prepareOnTopology(false, null);
-    }
-
-    /**
-     * @param remap Remap flag.
-     * @param c Optional closure to run after map.
-     */
-    private void prepareOnTopology(final boolean remap, @Nullable final 
Runnable c) {
-        GridDhtTopologyFuture topFut = topologyReadLock();
-
-        AffinityTopologyVersion topVer = null;
-
-        try {
-            if (topFut == null) {
-                assert isDone();
-
-                return;
-            }
-
-            if (topFut.isDone()) {
-                topVer = topFut.topologyVersion();
-
-                if (remap)
-                    tx.onRemap(topVer);
-                else
-                    tx.topologyVersion(topVer);
-
-                if (!remap)
-                    cctx.mvcc().addFuture(this);
-            }
-        }
-        finally {
-            topologyReadUnlock();
-        }
-
-        if (topVer != null) {
-            StringBuilder invalidCaches = null;
-
-            for (Integer cacheId : tx.activeCacheIds()) {
-                GridCacheContext ctx = cctx.cacheContext(cacheId);
-
-                assert ctx != null : cacheId;
-
-                Throwable err = topFut.validateCache(ctx);
-
-                if (err != null) {
-                    if (invalidCaches != null)
-                        invalidCaches.append(", ");
-                    else
-                        invalidCaches = new StringBuilder();
-
-                    invalidCaches.append(U.maskName(ctx.name()));
-                }
-            }
-
-            if (invalidCaches != null) {
-                onDone(new IgniteCheckedException("Failed to perform cache 
operation (cache topology is not valid): " +
-                    invalidCaches.toString()));
-
-                return;
-            }
-
-            prepare0(remap);
-
-            if (c != null)
-                c.run();
-        }
-        else {
-            topFut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                @Override public void apply(final 
IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                    cctx.kernalContext().closure().runLocalSafe(new 
GridPlainRunnable() {
-                        @Override public void run() {
-                            try {
-                                fut.get();
-
-                                prepareOnTopology(remap, c);
-                            }
-                            catch (IgniteCheckedException e) {
-                                onDone(e);
-                            }
-                            finally {
-                                cctx.txContextReset();
-                            }
-                        }
-                    });
-                }
-            });
-        }
-    }
-
-    /**
-     * Acquires topology read lock.
-     *
-     * @return Topology ready future.
-     */
-    private GridDhtTopologyFuture topologyReadLock() {
-        if (tx.activeCacheIds().isEmpty())
-            return cctx.exchange().lastTopologyFuture();
-
-        GridCacheContext<?, ?> nonLocCtx = null;
-
-        for (int cacheId : tx.activeCacheIds()) {
-            GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
-
-            if (!cacheCtx.isLocal()) {
-                nonLocCtx = cacheCtx;
-
-                break;
-            }
-        }
-
-        if (nonLocCtx == null)
-            return cctx.exchange().lastTopologyFuture();
-
-        nonLocCtx.topology().readLock();
-
-        if (nonLocCtx.topology().stopping()) {
-            onDone(new IgniteCheckedException("Failed to perform cache 
operation (cache is stopped): " +
-                nonLocCtx.name()));
-
-            return null;
-        }
-
-        return nonLocCtx.topology().topologyVersionFuture();
-    }
-
-    /**
-     * Releases topology read lock.
-     */
-    private void topologyReadUnlock() {
-        if (!tx.activeCacheIds().isEmpty()) {
-            GridCacheContext<?, ?> nonLocCtx = null;
-
-            for (int cacheId : tx.activeCacheIds()) {
-                GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
-
-                if (!cacheCtx.isLocal()) {
-                    nonLocCtx = cacheCtx;
-
-                    break;
-                }
-            }
-
-            if (nonLocCtx != null)
-                nonLocCtx.topology().readUnlock();
-        }
-    }
-
     /**
      * Initializes future.
      *
      * @param remap Remap flag.
      */
-    private void prepare0(boolean remap) {
+    @Override protected void prepare0(boolean remap, boolean topLocked) {
         try {
             boolean txStateCheck = remap ? tx.state() == PREPARING : 
tx.state(PREPARING);
 
@@ -479,7 +308,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearTxPre
                 return;
             }
 
-            prepare(tx.readEntries(), tx.writeEntries(), remap);
+            prepare(tx.readEntries(), tx.writeEntries(), remap, topLocked);
 
             markInitialized();
         }
@@ -492,13 +321,15 @@ public class 
GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
      * @param reads Read entries.
      * @param writes Write entries.
      * @param remap Remap flag.
+     * @param topLocked Topology locked flag.
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("unchecked")
     private void prepare(
         Iterable<IgniteTxEntry> reads,
         Iterable<IgniteTxEntry> writes,
-        boolean remap
+        boolean remap,
+        boolean topLocked
     ) throws IgniteCheckedException {
         AffinityTopologyVersion topVer = tx.topologyVersion();
 
@@ -522,10 +353,10 @@ public class 
GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
         Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> 
mappings = new HashMap<>();
 
         for (IgniteTxEntry write : writes)
-            map(write, topVer, mappings, remap);
+            map(write, topVer, mappings, remap, topLocked);
 
         for (IgniteTxEntry read : reads)
-            map(read, topVer, mappings, remap);
+            map(read, topVer, mappings, remap, topLocked);
 
         keyLockFut.onAllKeysAdded();
 
@@ -688,12 +519,14 @@ public class 
GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
      * @param topVer Topology version.
      * @param curMapping Current mapping.
      * @param remap Remap flag.
+     * @param topLocked Toplogy locked flag.
      */
     private void map(
         IgniteTxEntry entry,
         AffinityTopologyVersion topVer,
         Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> 
curMapping,
-        boolean remap
+        boolean remap,
+        boolean topLocked
     ) {
         GridCacheContext cacheCtx = entry.context();
 
@@ -750,7 +583,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearTxPre
             // Initialize near flag right away.
             cur.near(cacheCtx.isNear());
 
-            cur.clientFirst(cctx.kernalContext().clientNode());
+            cur.clientFirst(!topLocked && cctx.kernalContext().clientNode());
 
             cur.last(true);
         }
@@ -917,6 +750,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearTxPre
         /**
          * @param res Result callback.
          */
+        @SuppressWarnings("unchecked")
         void onResult(final GridNearTxPrepareResponse res) {
             if (isDone())
                 return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/cec7529b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index b65e519..e33bb85 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -35,7 +35,6 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -44,7 +43,6 @@ import 
org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -55,7 +53,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.transactions.TransactionOptimisticException;
 import org.apache.ignite.transactions.TransactionTimeoutException;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentLinkedDeque8;
@@ -67,7 +64,7 @@ import static 
org.apache.ignite.transactions.TransactionState.PREPARING;
 /**
  *
  */
-public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAdapter
+public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepareFutureAdapter
     implements GridCacheMvccFuture<IgniteInternalTx> {
     /** */
     @GridToStringInclude
@@ -137,11 +134,9 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
     }
 
     /**
-     * @param nodeId Failed node ID.
-     * @param mappings Remaining mappings.
      * @param e Error.
      */
-    void onError(@Nullable UUID nodeId, @Nullable 
Iterable<GridDistributedTxMapping> mappings, Throwable e) {
+    void onError(Throwable e) {
         if (X.hasCause(e, ClusterTopologyCheckedException.class) || 
X.hasCause(e, ClusterTopologyException.class)) {
             if (tx.onePhaseCommit()) {
                 tx.markForBackupCheck();
@@ -245,198 +240,27 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
         return false;
     }
 
-    /** {@inheritDoc} */
-    @Override public void prepare() {
-        // Obtain the topology version to use.
-        AffinityTopologyVersion topVer = 
cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
-
-        // If there is another system transaction in progress, use it's 
topology version to prevent deadlock.
-        if (topVer == null && tx != null && tx.system()) {
-            IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
-
-            if (tx0 != null)
-                topVer = tx0.topologyVersionSnapshot();
-        }
-
-        if (topVer != null) {
-            tx.topologyVersion(topVer);
-
-            cctx.mvcc().addFuture(this);
-
-            prepare0(false, true);
-
-            return;
-        }
-
-        prepareOnTopology(false, null);
-    }
-
-    /**
-     * @param remap Remap flag.
-     * @param c Optional closure to run after map.
-     */
-    private void prepareOnTopology(final boolean remap, @Nullable final 
Runnable c) {
-        GridDhtTopologyFuture topFut = topologyReadLock();
-
-        AffinityTopologyVersion topVer = null;
-
-        try {
-            if (topFut == null) {
-                assert isDone();
-
-                return;
-            }
-
-            if (topFut.isDone()) {
-                topVer = topFut.topologyVersion();
-
-                if (remap)
-                    tx.onRemap(topVer);
-                else
-                    tx.topologyVersion(topVer);
-
-                if (!remap)
-                    cctx.mvcc().addFuture(this);
-            }
-        }
-        finally {
-            topologyReadUnlock();
-        }
-
-        if (topVer != null) {
-            StringBuilder invalidCaches = null;
-
-            for (Integer cacheId : tx.activeCacheIds()) {
-                GridCacheContext ctx = cctx.cacheContext(cacheId);
-
-                assert ctx != null : cacheId;
-
-                Throwable err = topFut.validateCache(ctx);
-
-                if (err != null) {
-                    if (invalidCaches != null)
-                        invalidCaches.append(", ");
-                    else
-                        invalidCaches = new StringBuilder();
-
-                    invalidCaches.append(U.maskName(ctx.name()));
-                }
-            }
-
-            if (invalidCaches != null) {
-                onDone(new IgniteCheckedException("Failed to perform cache 
operation (cache topology is not valid): " +
-                    invalidCaches.toString()));
-
-                return;
-            }
-
-            prepare0(remap, false);
-
-            if (c != null)
-                c.run();
-        }
-        else {
-            topFut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                @Override public void apply(final 
IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                    cctx.kernalContext().closure().runLocalSafe(new 
GridPlainRunnable() {
-                        @Override public void run() {
-                            try {
-                                fut.get();
-
-                                prepareOnTopology(remap, c);
-                            }
-                            catch (IgniteCheckedException e) {
-                                onDone(e);
-                            }
-                            finally {
-                                cctx.txContextReset();
-                            }
-                        }
-                    });
-                }
-            });
-        }
-    }
-
-    /**
-     * Acquires topology read lock.
-     *
-     * @return Topology ready future.
-     */
-    private GridDhtTopologyFuture topologyReadLock() {
-        if (tx.activeCacheIds().isEmpty())
-            return cctx.exchange().lastTopologyFuture();
-
-        GridCacheContext<?, ?> nonLocCtx = null;
-
-        for (int cacheId : tx.activeCacheIds()) {
-            GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
-
-            if (!cacheCtx.isLocal()) {
-                nonLocCtx = cacheCtx;
-
-                break;
-            }
-        }
-
-        if (nonLocCtx == null)
-            return cctx.exchange().lastTopologyFuture();
-
-        nonLocCtx.topology().readLock();
-
-        if (nonLocCtx.topology().stopping()) {
-            onDone(new IgniteCheckedException("Failed to perform cache 
operation (cache is stopped): " +
-                nonLocCtx.name()));
-
-            return null;
-        }
-
-        return nonLocCtx.topology().topologyVersionFuture();
-    }
-
-    /**
-     * Releases topology read lock.
-     */
-    private void topologyReadUnlock() {
-        if (!tx.activeCacheIds().isEmpty()) {
-            GridCacheContext<?, ?> nonLocCtx = null;
-
-            for (int cacheId : tx.activeCacheIds()) {
-                GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
-
-                if (!cacheCtx.isLocal()) {
-                    nonLocCtx = cacheCtx;
-
-                    break;
-                }
-            }
-
-            if (nonLocCtx != null)
-                nonLocCtx.topology().readUnlock();
-        }
-    }
-
     /**
      * Initializes future.
      *
      * @param remap Remap flag.
      * @param topLocked {@code True} if thread already acquired lock 
preventing topology change.
      */
-    private void prepare0(boolean remap, boolean topLocked) {
+    @Override protected void prepare0(boolean remap, boolean topLocked) {
         try {
             boolean txStateCheck = remap ? tx.state() == PREPARING : 
tx.state(PREPARING);
 
             if (!txStateCheck) {
                 if (tx.setRollbackOnly()) {
                     if (tx.timedOut())
-                        onError(null, null, new 
IgniteTxTimeoutCheckedException("Transaction timed out and " +
+                        onError(new 
IgniteTxTimeoutCheckedException("Transaction timed out and " +
                             "was rolled back: " + this));
                     else
-                        onError(null, null, new 
IgniteCheckedException("Invalid transaction state for prepare " +
+                        onError(new IgniteCheckedException("Invalid 
transaction state for prepare " +
                             "[state=" + tx.state() + ", tx=" + this + ']'));
                 }
                 else
-                    onError(null, null, new 
IgniteTxRollbackCheckedException("Invalid transaction state for " +
+                    onError(new IgniteTxRollbackCheckedException("Invalid 
transaction state for " +
                         "prepare [state=" + tx.state() + ", tx=" + this + 
']'));
 
                 return;
@@ -446,8 +270,8 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
 
             markInitialized();
         }
-        catch (TransactionTimeoutException | TransactionOptimisticException e) 
{
-            onError(cctx.localNodeId(), null, e);
+        catch (TransactionTimeoutException e) {
+            onError( e);
         }
         catch (IgniteCheckedException e) {
             onDone(e);
@@ -573,7 +397,7 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
                 tx.userPrepare();
             }
             catch (IgniteCheckedException e) {
-                onError(null, null, e);
+                onError(e);
             }
         }
 
@@ -795,7 +619,7 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
 
                 // Fail the whole future (make sure not to remap on different 
primary node
                 // to prevent multiple lock coordinators).
-                onError(null, null, e);
+                onError(e);
             }
         }
 
@@ -810,7 +634,7 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
             if (rcvRes.compareAndSet(false, true)) {
                 if (res.error() != null) {
                     // Fail the whole compound future.
-                    onError(nodeId, mappings, res.error());
+                    onError(res.error());
                 }
                 else {
                     if (res.clientRemapVersion() != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/cec7529b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
new file mode 100644
index 0000000..fd9183e
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public abstract class GridNearOptimisticTxPrepareFutureAdapter extends 
GridNearTxPrepareFutureAdapter {
+    /**
+     * @param cctx Context.
+     * @param tx Transaction.
+     */
+    public GridNearOptimisticTxPrepareFutureAdapter(GridCacheSharedContext 
cctx, GridNearTxLocal tx) {
+        super(cctx, tx);
+
+        assert tx.optimistic() : tx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public final void prepare() {
+        // Obtain the topology version to use.
+        AffinityTopologyVersion topVer = 
cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+
+        // If there is another system transaction in progress, use it's 
topology version to prevent deadlock.
+        if (topVer == null && tx != null && tx.system()) {
+            IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
+
+            if (tx0 != null)
+                topVer = tx0.topologyVersionSnapshot();
+        }
+
+        if (topVer != null) {
+            tx.topologyVersion(topVer);
+
+            cctx.mvcc().addFuture(this);
+
+            prepare0(false, true);
+
+            return;
+        }
+
+        prepareOnTopology(false, null);
+    }
+
+    /**
+     * Acquires topology read lock.
+     *
+     * @return Topology ready future.
+     */
+    protected final GridDhtTopologyFuture topologyReadLock() {
+        if (tx.activeCacheIds().isEmpty())
+            return cctx.exchange().lastTopologyFuture();
+
+        GridCacheContext<?, ?> nonLocCtx = null;
+
+        for (int cacheId : tx.activeCacheIds()) {
+            GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+            if (!cacheCtx.isLocal()) {
+                nonLocCtx = cacheCtx;
+
+                break;
+            }
+        }
+
+        if (nonLocCtx == null)
+            return cctx.exchange().lastTopologyFuture();
+
+        nonLocCtx.topology().readLock();
+
+        if (nonLocCtx.topology().stopping()) {
+            onDone(new IgniteCheckedException("Failed to perform cache 
operation (cache is stopped): " +
+                nonLocCtx.name()));
+
+            return null;
+        }
+
+        return nonLocCtx.topology().topologyVersionFuture();
+    }
+
+    /**
+     * Releases topology read lock.
+     */
+    protected final void topologyReadUnlock() {
+        if (!tx.activeCacheIds().isEmpty()) {
+            GridCacheContext<?, ?> nonLocCtx = null;
+
+            for (int cacheId : tx.activeCacheIds()) {
+                GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+                if (!cacheCtx.isLocal()) {
+                    nonLocCtx = cacheCtx;
+
+                    break;
+                }
+            }
+
+            if (nonLocCtx != null)
+                nonLocCtx.topology().readUnlock();
+        }
+    }
+
+    /**
+     * @param remap Remap flag.
+     * @param c Optional closure to run after map.
+     */
+    protected final void prepareOnTopology(final boolean remap, @Nullable 
final Runnable c) {
+        GridDhtTopologyFuture topFut = topologyReadLock();
+
+        AffinityTopologyVersion topVer = null;
+
+        try {
+            if (topFut == null) {
+                assert isDone();
+
+                return;
+            }
+
+            if (topFut.isDone()) {
+                topVer = topFut.topologyVersion();
+
+                if (remap)
+                    tx.onRemap(topVer);
+                else
+                    tx.topologyVersion(topVer);
+
+                if (!remap)
+                    cctx.mvcc().addFuture(this);
+            }
+        }
+        finally {
+            topologyReadUnlock();
+        }
+
+        if (topVer != null) {
+            StringBuilder invalidCaches = null;
+
+            for (Integer cacheId : tx.activeCacheIds()) {
+                GridCacheContext ctx = cctx.cacheContext(cacheId);
+
+                assert ctx != null : cacheId;
+
+                Throwable err = topFut.validateCache(ctx);
+
+                if (err != null) {
+                    if (invalidCaches != null)
+                        invalidCaches.append(", ");
+                    else
+                        invalidCaches = new StringBuilder();
+
+                    invalidCaches.append(U.maskName(ctx.name()));
+                }
+            }
+
+            if (invalidCaches != null) {
+                onDone(new IgniteCheckedException("Failed to perform cache 
operation (cache topology is not valid): " +
+                    invalidCaches.toString()));
+
+                return;
+            }
+
+            prepare0(remap, false);
+
+            if (c != null)
+                c.run();
+        }
+        else {
+            topFut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                @Override public void apply(final 
IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                    cctx.kernalContext().closure().runLocalSafe(new 
GridPlainRunnable() {
+                        @Override public void run() {
+                            try {
+                                fut.get();
+
+                                prepareOnTopology(remap, c);
+                            }
+                            catch (IgniteCheckedException e) {
+                                onDone(e);
+                            }
+                            finally {
+                                cctx.txContextReset();
+                            }
+                        }
+                    });
+                }
+            });
+        }
+    }
+
+    /**
+     * @param remap Remap flag.
+     * @param topLocked {@code True} if thread already acquired lock 
preventing topology change.
+     */
+    protected abstract void prepare0(boolean remap, boolean topLocked);
+}

Reply via email to