Repository: ignite Updated Branches: refs/heads/ignite-1607 fa7e5d2f2 -> cec7529bf
ignite-1607 WIP Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cec7529b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cec7529b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cec7529b Branch: refs/heads/ignite-1607 Commit: cec7529bfb06cfbb3579c9fcefe2e88a6c3fc3ca Parents: fa7e5d2 Author: sboikov <[email protected]> Authored: Wed Oct 21 15:32:21 2015 +0300 Committer: sboikov <[email protected]> Committed: Wed Oct 21 15:32:21 2015 +0300 ---------------------------------------------------------------------- ...arOptimisticSerializableTxPrepareFuture.java | 196 ++-------------- .../near/GridNearOptimisticTxPrepareFuture.java | 198 +---------------- ...ridNearOptimisticTxPrepareFutureAdapter.java | 222 +++++++++++++++++++ 3 files changed, 248 insertions(+), 368 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/cec7529b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 5f41cdf..8e5bac2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -76,7 +76,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING; /** * */ -public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPrepareFutureAdapter +public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter implements GridCacheMvccFuture<IgniteInternalTx> { /** */ public static final IgniteProductVersion SER_TX_SINCE = IgniteProductVersion.fromString("1.5.0"); @@ -155,12 +155,12 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre return found; } - /* + /** * @param m Failed mapping. * @param e Error. * @param res Response. */ - void onError(@Nullable GridDistributedTxMapping m, Throwable e, GridNearTxPrepareResponse res) { + private void onError(@Nullable GridDistributedTxMapping m, Throwable e, GridNearTxPrepareResponse res) { if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) { if (tx.onePhaseCommit()) { tx.markForBackupCheck(); @@ -283,183 +283,12 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre return false; } - /** {@inheritDoc} */ - @Override public void prepare() { - // Obtain the topology version to use. - AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); - - // If there is another system transaction in progress, use it's topology version to prevent deadlock. - if (topVer == null && tx != null && tx.system()) { - IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx); - - if (tx0 != null) - topVer = tx0.topologyVersionSnapshot(); - } - - if (topVer != null) { - tx.topologyVersion(topVer); - - cctx.mvcc().addFuture(this); - - prepare0(false); - - return; - } - - prepareOnTopology(false, null); - } - - /** - * @param remap Remap flag. - * @param c Optional closure to run after map. - */ - private void prepareOnTopology(final boolean remap, @Nullable final Runnable c) { - GridDhtTopologyFuture topFut = topologyReadLock(); - - AffinityTopologyVersion topVer = null; - - try { - if (topFut == null) { - assert isDone(); - - return; - } - - if (topFut.isDone()) { - topVer = topFut.topologyVersion(); - - if (remap) - tx.onRemap(topVer); - else - tx.topologyVersion(topVer); - - if (!remap) - cctx.mvcc().addFuture(this); - } - } - finally { - topologyReadUnlock(); - } - - if (topVer != null) { - StringBuilder invalidCaches = null; - - for (Integer cacheId : tx.activeCacheIds()) { - GridCacheContext ctx = cctx.cacheContext(cacheId); - - assert ctx != null : cacheId; - - Throwable err = topFut.validateCache(ctx); - - if (err != null) { - if (invalidCaches != null) - invalidCaches.append(", "); - else - invalidCaches = new StringBuilder(); - - invalidCaches.append(U.maskName(ctx.name())); - } - } - - if (invalidCaches != null) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + - invalidCaches.toString())); - - return; - } - - prepare0(remap); - - if (c != null) - c.run(); - } - else { - topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { - cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { - @Override public void run() { - try { - fut.get(); - - prepareOnTopology(remap, c); - } - catch (IgniteCheckedException e) { - onDone(e); - } - finally { - cctx.txContextReset(); - } - } - }); - } - }); - } - } - - /** - * Acquires topology read lock. - * - * @return Topology ready future. - */ - private GridDhtTopologyFuture topologyReadLock() { - if (tx.activeCacheIds().isEmpty()) - return cctx.exchange().lastTopologyFuture(); - - GridCacheContext<?, ?> nonLocCtx = null; - - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); - - if (!cacheCtx.isLocal()) { - nonLocCtx = cacheCtx; - - break; - } - } - - if (nonLocCtx == null) - return cctx.exchange().lastTopologyFuture(); - - nonLocCtx.topology().readLock(); - - if (nonLocCtx.topology().stopping()) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + - nonLocCtx.name())); - - return null; - } - - return nonLocCtx.topology().topologyVersionFuture(); - } - - /** - * Releases topology read lock. - */ - private void topologyReadUnlock() { - if (!tx.activeCacheIds().isEmpty()) { - GridCacheContext<?, ?> nonLocCtx = null; - - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); - - if (!cacheCtx.isLocal()) { - nonLocCtx = cacheCtx; - - break; - } - } - - if (nonLocCtx != null) - nonLocCtx.topology().readUnlock(); - } - } - /** * Initializes future. * * @param remap Remap flag. */ - private void prepare0(boolean remap) { + @Override protected void prepare0(boolean remap, boolean topLocked) { try { boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING); @@ -479,7 +308,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre return; } - prepare(tx.readEntries(), tx.writeEntries(), remap); + prepare(tx.readEntries(), tx.writeEntries(), remap, topLocked); markInitialized(); } @@ -492,13 +321,15 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre * @param reads Read entries. * @param writes Write entries. * @param remap Remap flag. + * @param topLocked Topology locked flag. * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") private void prepare( Iterable<IgniteTxEntry> reads, Iterable<IgniteTxEntry> writes, - boolean remap + boolean remap, + boolean topLocked ) throws IgniteCheckedException { AffinityTopologyVersion topVer = tx.topologyVersion(); @@ -522,10 +353,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>(); for (IgniteTxEntry write : writes) - map(write, topVer, mappings, remap); + map(write, topVer, mappings, remap, topLocked); for (IgniteTxEntry read : reads) - map(read, topVer, mappings, remap); + map(read, topVer, mappings, remap, topLocked); keyLockFut.onAllKeysAdded(); @@ -688,12 +519,14 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre * @param topVer Topology version. * @param curMapping Current mapping. * @param remap Remap flag. + * @param topLocked Toplogy locked flag. */ private void map( IgniteTxEntry entry, AffinityTopologyVersion topVer, Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> curMapping, - boolean remap + boolean remap, + boolean topLocked ) { GridCacheContext cacheCtx = entry.context(); @@ -750,7 +583,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre // Initialize near flag right away. cur.near(cacheCtx.isNear()); - cur.clientFirst(cctx.kernalContext().clientNode()); + cur.clientFirst(!topLocked && cctx.kernalContext().clientNode()); cur.last(true); } @@ -917,6 +750,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre /** * @param res Result callback. */ + @SuppressWarnings("unchecked") void onResult(final GridNearTxPrepareResponse res) { if (isDone()) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/cec7529b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index b65e519..e33bb85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -35,7 +35,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; @@ -44,7 +43,6 @@ import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; @@ -55,7 +53,6 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.transactions.TransactionOptimisticException; import org.apache.ignite.transactions.TransactionTimeoutException; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentLinkedDeque8; @@ -67,7 +64,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING; /** * */ -public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAdapter +public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter implements GridCacheMvccFuture<IgniteInternalTx> { /** */ @GridToStringInclude @@ -137,11 +134,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd } /** - * @param nodeId Failed node ID. - * @param mappings Remaining mappings. * @param e Error. */ - void onError(@Nullable UUID nodeId, @Nullable Iterable<GridDistributedTxMapping> mappings, Throwable e) { + void onError(Throwable e) { if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) { if (tx.onePhaseCommit()) { tx.markForBackupCheck(); @@ -245,198 +240,27 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd return false; } - /** {@inheritDoc} */ - @Override public void prepare() { - // Obtain the topology version to use. - AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); - - // If there is another system transaction in progress, use it's topology version to prevent deadlock. - if (topVer == null && tx != null && tx.system()) { - IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx); - - if (tx0 != null) - topVer = tx0.topologyVersionSnapshot(); - } - - if (topVer != null) { - tx.topologyVersion(topVer); - - cctx.mvcc().addFuture(this); - - prepare0(false, true); - - return; - } - - prepareOnTopology(false, null); - } - - /** - * @param remap Remap flag. - * @param c Optional closure to run after map. - */ - private void prepareOnTopology(final boolean remap, @Nullable final Runnable c) { - GridDhtTopologyFuture topFut = topologyReadLock(); - - AffinityTopologyVersion topVer = null; - - try { - if (topFut == null) { - assert isDone(); - - return; - } - - if (topFut.isDone()) { - topVer = topFut.topologyVersion(); - - if (remap) - tx.onRemap(topVer); - else - tx.topologyVersion(topVer); - - if (!remap) - cctx.mvcc().addFuture(this); - } - } - finally { - topologyReadUnlock(); - } - - if (topVer != null) { - StringBuilder invalidCaches = null; - - for (Integer cacheId : tx.activeCacheIds()) { - GridCacheContext ctx = cctx.cacheContext(cacheId); - - assert ctx != null : cacheId; - - Throwable err = topFut.validateCache(ctx); - - if (err != null) { - if (invalidCaches != null) - invalidCaches.append(", "); - else - invalidCaches = new StringBuilder(); - - invalidCaches.append(U.maskName(ctx.name())); - } - } - - if (invalidCaches != null) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + - invalidCaches.toString())); - - return; - } - - prepare0(remap, false); - - if (c != null) - c.run(); - } - else { - topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { - cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { - @Override public void run() { - try { - fut.get(); - - prepareOnTopology(remap, c); - } - catch (IgniteCheckedException e) { - onDone(e); - } - finally { - cctx.txContextReset(); - } - } - }); - } - }); - } - } - - /** - * Acquires topology read lock. - * - * @return Topology ready future. - */ - private GridDhtTopologyFuture topologyReadLock() { - if (tx.activeCacheIds().isEmpty()) - return cctx.exchange().lastTopologyFuture(); - - GridCacheContext<?, ?> nonLocCtx = null; - - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); - - if (!cacheCtx.isLocal()) { - nonLocCtx = cacheCtx; - - break; - } - } - - if (nonLocCtx == null) - return cctx.exchange().lastTopologyFuture(); - - nonLocCtx.topology().readLock(); - - if (nonLocCtx.topology().stopping()) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + - nonLocCtx.name())); - - return null; - } - - return nonLocCtx.topology().topologyVersionFuture(); - } - - /** - * Releases topology read lock. - */ - private void topologyReadUnlock() { - if (!tx.activeCacheIds().isEmpty()) { - GridCacheContext<?, ?> nonLocCtx = null; - - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); - - if (!cacheCtx.isLocal()) { - nonLocCtx = cacheCtx; - - break; - } - } - - if (nonLocCtx != null) - nonLocCtx.topology().readUnlock(); - } - } - /** * Initializes future. * * @param remap Remap flag. * @param topLocked {@code True} if thread already acquired lock preventing topology change. */ - private void prepare0(boolean remap, boolean topLocked) { + @Override protected void prepare0(boolean remap, boolean topLocked) { try { boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING); if (!txStateCheck) { if (tx.setRollbackOnly()) { if (tx.timedOut()) - onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " + + onError(new IgniteTxTimeoutCheckedException("Transaction timed out and " + "was rolled back: " + this)); else - onError(null, null, new IgniteCheckedException("Invalid transaction state for prepare " + + onError(new IgniteCheckedException("Invalid transaction state for prepare " + "[state=" + tx.state() + ", tx=" + this + ']')); } else - onError(null, null, new IgniteTxRollbackCheckedException("Invalid transaction state for " + + onError(new IgniteTxRollbackCheckedException("Invalid transaction state for " + "prepare [state=" + tx.state() + ", tx=" + this + ']')); return; @@ -446,8 +270,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd markInitialized(); } - catch (TransactionTimeoutException | TransactionOptimisticException e) { - onError(cctx.localNodeId(), null, e); + catch (TransactionTimeoutException e) { + onError( e); } catch (IgniteCheckedException e) { onDone(e); @@ -573,7 +397,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd tx.userPrepare(); } catch (IgniteCheckedException e) { - onError(null, null, e); + onError(e); } } @@ -795,7 +619,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd // Fail the whole future (make sure not to remap on different primary node // to prevent multiple lock coordinators). - onError(null, null, e); + onError(e); } } @@ -810,7 +634,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd if (rcvRes.compareAndSet(false, true)) { if (res.error() != null) { // Fail the whole compound future. - onError(nodeId, mappings, res.error()); + onError(res.error()); } else { if (res.clientRemapVersion() != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/cec7529b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java new file mode 100644 index 0000000..fd9183e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearTxPrepareFutureAdapter { + /** + * @param cctx Context. + * @param tx Transaction. + */ + public GridNearOptimisticTxPrepareFutureAdapter(GridCacheSharedContext cctx, GridNearTxLocal tx) { + super(cctx, tx); + + assert tx.optimistic() : tx; + } + + /** {@inheritDoc} */ + @Override public final void prepare() { + // Obtain the topology version to use. + AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); + + // If there is another system transaction in progress, use it's topology version to prevent deadlock. + if (topVer == null && tx != null && tx.system()) { + IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx); + + if (tx0 != null) + topVer = tx0.topologyVersionSnapshot(); + } + + if (topVer != null) { + tx.topologyVersion(topVer); + + cctx.mvcc().addFuture(this); + + prepare0(false, true); + + return; + } + + prepareOnTopology(false, null); + } + + /** + * Acquires topology read lock. + * + * @return Topology ready future. + */ + protected final GridDhtTopologyFuture topologyReadLock() { + if (tx.activeCacheIds().isEmpty()) + return cctx.exchange().lastTopologyFuture(); + + GridCacheContext<?, ?> nonLocCtx = null; + + for (int cacheId : tx.activeCacheIds()) { + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + if (!cacheCtx.isLocal()) { + nonLocCtx = cacheCtx; + + break; + } + } + + if (nonLocCtx == null) + return cctx.exchange().lastTopologyFuture(); + + nonLocCtx.topology().readLock(); + + if (nonLocCtx.topology().stopping()) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + nonLocCtx.name())); + + return null; + } + + return nonLocCtx.topology().topologyVersionFuture(); + } + + /** + * Releases topology read lock. + */ + protected final void topologyReadUnlock() { + if (!tx.activeCacheIds().isEmpty()) { + GridCacheContext<?, ?> nonLocCtx = null; + + for (int cacheId : tx.activeCacheIds()) { + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + if (!cacheCtx.isLocal()) { + nonLocCtx = cacheCtx; + + break; + } + } + + if (nonLocCtx != null) + nonLocCtx.topology().readUnlock(); + } + } + + /** + * @param remap Remap flag. + * @param c Optional closure to run after map. + */ + protected final void prepareOnTopology(final boolean remap, @Nullable final Runnable c) { + GridDhtTopologyFuture topFut = topologyReadLock(); + + AffinityTopologyVersion topVer = null; + + try { + if (topFut == null) { + assert isDone(); + + return; + } + + if (topFut.isDone()) { + topVer = topFut.topologyVersion(); + + if (remap) + tx.onRemap(topVer); + else + tx.topologyVersion(topVer); + + if (!remap) + cctx.mvcc().addFuture(this); + } + } + finally { + topologyReadUnlock(); + } + + if (topVer != null) { + StringBuilder invalidCaches = null; + + for (Integer cacheId : tx.activeCacheIds()) { + GridCacheContext ctx = cctx.cacheContext(cacheId); + + assert ctx != null : cacheId; + + Throwable err = topFut.validateCache(ctx); + + if (err != null) { + if (invalidCaches != null) + invalidCaches.append(", "); + else + invalidCaches = new StringBuilder(); + + invalidCaches.append(U.maskName(ctx.name())); + } + } + + if (invalidCaches != null) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + + invalidCaches.toString())); + + return; + } + + prepare0(remap, false); + + if (c != null) + c.run(); + } + else { + topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { + cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { + @Override public void run() { + try { + fut.get(); + + prepareOnTopology(remap, c); + } + catch (IgniteCheckedException e) { + onDone(e); + } + finally { + cctx.txContextReset(); + } + } + }); + } + }); + } + } + + /** + * @param remap Remap flag. + * @param topLocked {@code True} if thread already acquired lock preventing topology change. + */ + protected abstract void prepare0(boolean remap, boolean topLocked); +}
