Repository: ignite Updated Branches: refs/heads/ignite-1607 [created] 3ed37b267
ignite-1607 WIP Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3ed37b26 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3ed37b26 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3ed37b26 Branch: refs/heads/ignite-1607 Commit: 3ed37b26715fa894079f657c1171a68613cb629f Parents: fd091c8 Author: sboikov <[email protected]> Authored: Fri Oct 2 17:28:46 2015 +0300 Committer: sboikov <[email protected]> Committed: Fri Oct 2 17:28:46 2015 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtCacheEntry.java | 4 +- ...arOptimisticSerializableTxPrepareFuture.java | 849 +++++++++++++++++++ .../cache/distributed/near/GridNearTxLocal.java | 18 +- .../cache/transactions/IgniteTxManager.java | 16 +- .../transactions/TransactionIsolation.java | 8 +- .../cache/CacheDeadlockFreeTxTest.java | 484 +++++++++++ .../junits/common/GridCommonAbstractTest.java | 19 +- 7 files changed, 1385 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3ed37b26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index be2f3d3..bf22c7d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -235,12 +235,12 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { val = this.val; - if (mvcc != null && mvcc.isEmpty()) + if (mvcc.isEmpty()) mvccExtras(null); } // Don't link reentries. - if (cand != null && !cand.reentry()) + if (!cand.reentry()) // Link with other candidates in the same thread. cctx.mvcc().addNext(cctx, cand); http://git-wip-us.apache.org/repos/asf/ignite/blob/3ed37b26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java new file mode 100644 index 0000000..9056ae9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -0,0 +1,849 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterTopologyException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; +import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; +import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; +import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; +import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.C1; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; +import static org.apache.ignite.transactions.TransactionState.PREPARED; +import static org.apache.ignite.transactions.TransactionState.PREPARING; + +/** + * + */ +public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPrepareFutureAdapter + implements GridCacheMvccFuture<IgniteInternalTx> { + /** */ + @GridToStringInclude + private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>(); + + /** + * @param cctx Context. + * @param tx Transaction. + */ + public GridNearOptimisticSerializableTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) { + super(cctx, tx); + + assert tx.optimistic() : tx; + + // Should wait for all mini futures completion before finishing tx. + ignoreChildFailures(IgniteCheckedException.class); + } + + /** {@inheritDoc} */ + @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) { + if (log.isDebugEnabled()) + log.debug("Transaction future received owner changed callback: " + entry); + + if ((entry.context().isNear() || entry.context().isLocal()) && owner != null && tx.hasWriteKey(entry.txKey())) { + lockKeys.remove(entry.txKey()); + + // This will check for locks. + onDone(); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public Collection<? extends ClusterNode> nodes() { + return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { + @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { + if (isMini(f)) + return ((MiniFuture)f).node(); + + return cctx.discovery().localNode(); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean onNodeLeft(UUID nodeId) { + boolean found = false; + + for (IgniteInternalFuture<?> fut : futures()) { + if (isMini(fut)) { + MiniFuture f = (MiniFuture) fut; + + if (f.node().id().equals(nodeId)) { + ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " + + nodeId); + + e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); + + f.onResult(e); + + found = true; + } + } + } + + return found; + } + + /** + * @param nodeId Failed node ID. + * @param e Error. + */ + void onError(@Nullable UUID nodeId, Throwable e) { + if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) { + if (tx.onePhaseCommit()) { + tx.markForBackupCheck(); + + onComplete(); + + return; + } + } + + if (e instanceof IgniteTxOptimisticCheckedException && nodeId != null) + tx.onOptimisticException(nodeId); + + if (err.compareAndSet(null, e)) + tx.setRollbackOnly(); + } + + /** + * @return {@code True} if all locks are owned. + */ + private boolean checkLocks() { + boolean locked = lockKeys.isEmpty(); + + if (locked) { + if (log.isDebugEnabled()) + log.debug("All locks are acquired for near prepare future: " + this); + } + else { + if (log.isDebugEnabled()) + log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']'); + } + + return locked; + } + + /** {@inheritDoc} */ + @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) { + if (!isDone()) { + for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) { + if (isMini(fut)) { + MiniFuture f = (MiniFuture)fut; + + if (f.futureId().equals(res.miniId())) { + assert f.node().id().equals(nodeId); + + f.onResult(nodeId, res); + } + } + } + } + } + + /** {@inheritDoc} */ + @Override public boolean onDone(IgniteInternalTx t, Throwable err) { + this.err.compareAndSet(null, err); + + err = this.err.get(); + + // If locks were not acquired yet, delay completion. + if (isDone() || (err == null && !checkLocks())) + return false; + + return onComplete(); + } + + /** + * @param f Future. + * @return {@code True} if mini-future. + */ + private boolean isMini(IgniteInternalFuture<?> f) { + return f.getClass().equals(MiniFuture.class); + } + + /** + * Completeness callback. + * + * @return {@code True} if future was finished by this call. + */ + private boolean onComplete() { + Throwable err0 = err.get(); + + if (err0 == null || tx.needCheckBackup()) + tx.state(PREPARED); + + if (super.onDone(tx, err0)) { + // Don't forget to clean up. + cctx.mvcc().removeFuture(this); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public void prepare() { + // Obtain the topology version to use. + AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); + + // If there is another system transaction in progress, use it's topology version to prevent deadlock. + if (topVer == null && tx != null && tx.system()) { + IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx); + + if (tx0 != null) + topVer = tx0.topologyVersionSnapshot(); + } + + if (topVer != null) { + tx.topologyVersion(topVer); + + cctx.mvcc().addFuture(this); + + prepare0(false); + + return; + } + + prepareOnTopology(false, null); + } + + /** + * @param remap Remap flag. + * @param c Optional closure to run after map. + */ + private void prepareOnTopology(final boolean remap, @Nullable final Runnable c) { + GridDhtTopologyFuture topFut = topologyReadLock(); + + AffinityTopologyVersion topVer = null; + + try { + if (topFut == null) { + assert isDone(); + + return; + } + + if (topFut.isDone()) { + topVer = topFut.topologyVersion(); + + if (remap) + tx.onRemap(topVer); + else + tx.topologyVersion(topVer); + + if (!remap) + cctx.mvcc().addFuture(this); + } + } + finally { + topologyReadUnlock(); + } + + if (topVer != null) { + StringBuilder invalidCaches = null; + + for (Integer cacheId : tx.activeCacheIds()) { + GridCacheContext ctx = cctx.cacheContext(cacheId); + + assert ctx != null : cacheId; + + Throwable err = topFut.validateCache(ctx); + + if (err != null) { + if (invalidCaches != null) + invalidCaches.append(", "); + else + invalidCaches = new StringBuilder(); + + invalidCaches.append(U.maskName(ctx.name())); + } + } + + if (invalidCaches != null) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + + invalidCaches.toString())); + + return; + } + + prepare0(remap); + + if (c != null) + c.run(); + } + else { + topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override + public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { + cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { + @Override + public void run() { + try { + fut.get(); + + prepareOnTopology(remap, c); + } catch (IgniteCheckedException e) { + onDone(e); + } finally { + cctx.txContextReset(); + } + } + }); + } + }); + } + } + + /** + * Acquires topology read lock. + * + * @return Topology ready future. + */ + private GridDhtTopologyFuture topologyReadLock() { + if (tx.activeCacheIds().isEmpty()) + return cctx.exchange().lastTopologyFuture(); + + GridCacheContext<?, ?> nonLocCtx = null; + + for (int cacheId : tx.activeCacheIds()) { + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + if (!cacheCtx.isLocal()) { + nonLocCtx = cacheCtx; + + break; + } + } + + if (nonLocCtx == null) + return cctx.exchange().lastTopologyFuture(); + + nonLocCtx.topology().readLock(); + + if (nonLocCtx.topology().stopping()) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + nonLocCtx.name())); + + return null; + } + + return nonLocCtx.topology().topologyVersionFuture(); + } + + /** + * Releases topology read lock. + */ + private void topologyReadUnlock() { + if (!tx.activeCacheIds().isEmpty()) { + GridCacheContext<?, ?> nonLocCtx = null; + + for (int cacheId : tx.activeCacheIds()) { + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + if (!cacheCtx.isLocal()) { + nonLocCtx = cacheCtx; + + break; + } + } + + if (nonLocCtx != null) + nonLocCtx.topology().readUnlock(); + } + } + + /** + * Initializes future. + * + * @param remap Remap flag. + */ + private void prepare0(boolean remap) { + try { + boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING); + + if (!txStateCheck) { + if (tx.setRollbackOnly()) { + if (tx.timedOut()) + onError(null, new IgniteTxTimeoutCheckedException("Transaction timed out and " + + "was rolled back: " + this)); + else + onError(null, new IgniteCheckedException("Invalid transaction state for prepare " + + "[state=" + tx.state() + ", tx=" + this + ']')); + } + else + onError(null, new IgniteTxRollbackCheckedException("Invalid transaction state for " + + "prepare [state=" + tx.state() + ", tx=" + this + ']')); + + return; + } + + prepare( + tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(), + tx.writeEntries()); + + markInitialized(); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + + /** + * @param reads Read entries. + * @param writes Write entries. + * @throws IgniteCheckedException If failed. + */ + private void prepare( + Iterable<IgniteTxEntry> reads, + Iterable<IgniteTxEntry> writes + ) throws IgniteCheckedException { + AffinityTopologyVersion topVer = tx.topologyVersion(); + + assert topVer.topologyVersion() > 0; + + txMapping = new GridDhtTxMapping(); + + if (!F.isEmpty(reads) || !F.isEmpty(writes)) { + for (int cacheId : tx.activeCacheIds()) { + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) { + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all " + + "partition nodes left the grid): " + cacheCtx.name())); + + return; + } + } + } + + Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>(); + + for (IgniteTxEntry read : reads) + map(read, topVer, mappings, false); + + for (IgniteTxEntry write : writes) + map(write, topVer, mappings, true); + + if (isDone()) { + if (log.isDebugEnabled()) + log.debug("Abandoning (re)map because future is done: " + this); + + return; + } + + cctx.mvcc().recheckPendingLocks(); + + tx.addEntryMapping(mappings.values()); + + tx.transactionNodes(txMapping.transactionNodes()); + + checkOnePhase(); + + for (GridDistributedTxMapping m : mappings.values()) { + if (!prepare(m)) + break; + } + + markInitialized(); + } + + /** + * @param m Mapping. + * @return {@code False} if should stop mapping. + */ + private boolean prepare(GridDistributedTxMapping m) { + assert !m.empty(); + + final ClusterNode n = m.node(); + + GridNearTxPrepareRequest req = new GridNearTxPrepareRequest( + futId, + tx.topologyVersion(), + tx, + tx.optimistic() && tx.serializable() ? m.reads() : null, + m.writes(), + m.near(), + txMapping.transactionNodes(), + m.last(), + m.lastBackups(), + tx.onePhaseCommit(), + tx.needReturnValue() && tx.implicit(), + tx.implicitSingle(), + m.explicitLock(), + tx.subjectId(), + tx.taskNameHash(), + m.clientFirst()); + + for (IgniteTxEntry txEntry : m.writes()) { + if (txEntry.op() == TRANSFORM) + req.addDhtVersion(txEntry.txKey(), null); + } + + // Must lock near entries separately. + if (m.near()) { + try { + tx.optimisticLockEntries(req.writes()); + + tx.userPrepare(); + } + catch (IgniteCheckedException e) { + onError(m.node().id(), e); + + return false; + } + } + + final MiniFuture fut = new MiniFuture(m); + + req.miniId(fut.futureId()); + + add(fut); // Append new future. + + // If this is the primary node for the keys. + if (n.isLocal()) { + IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(n.id(), tx, req); + + prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() { + @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) { + try { + fut.onResult(n.id(), prepFut.get()); + } + catch (IgniteCheckedException e) { + fut.onResult(e); + } + } + }); + } + else { + try { + cctx.io().send(n, req, tx.ioPolicy()); + } + catch (ClusterTopologyCheckedException e) { + e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); + + fut.onResult(e); + + return false; + } + catch (IgniteCheckedException e) { + fut.onResult(e); + + return false; + } + } + + return true; + } + + /** + * @param entry Transaction entry. + * @param topVer Topology version. + * @param curMapping Current mapping. + * @param waitLock Wait lock flag. + */ + private void map( + IgniteTxEntry entry, + AffinityTopologyVersion topVer, + Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> curMapping, + boolean waitLock + ) { + GridCacheContext cacheCtx = entry.context(); + + List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), topVer); + + txMapping.addMapping(nodes); + + ClusterNode primary = F.first(nodes); + + assert primary != null; + + if (log.isDebugEnabled()) { + log.debug("Mapped key to primary node [key=" + entry.key() + + ", part=" + cacheCtx.affinity().partition(entry.key()) + + ", primary=" + U.toShortString(primary) + ", topVer=" + topVer + ']'); + } + + // Must re-initialize cached entry while holding topology lock. + if (cacheCtx.isNear()) + entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer)); + else if (!cacheCtx.isLocal()) + entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true)); + else + entry.cached(cacheCtx.local().entryEx(entry.key(), topVer)); + + if (cacheCtx.isNear() || cacheCtx.isLocal()) { + if (waitLock && entry.explicitVersion() == null) + lockKeys.add(entry.txKey()); + } + + IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, cacheCtx.isNear()); + + GridDistributedTxMapping cur = curMapping.get(key); + + if (cur == null) { + cur = new GridDistributedTxMapping(primary); + + curMapping.put(key, cur); + + if (primary.isLocal()) { + if (entry.context().isNear()) + tx.nearLocallyMapped(true); + else if (entry.context().isColocated()) + tx.colocatedLocallyMapped(true); + } + + // Initialize near flag right away. + cur.near(cacheCtx.isNear()); + + cur.clientFirst(cctx.kernalContext().clientNode()); + + cur.last(true); + } + + cur.add(entry); + + if (entry.explicitVersion() != null) { + tx.markExplicit(primary.id()); + + cur.markExplicitLock(); + } + + entry.nodeId(primary.id()); + + if (cacheCtx.isNear()) { + while (true) { + try { + GridNearCacheEntry cached = (GridNearCacheEntry)entry.cached(); + + cached.dhtNodeId(tx.xidVersion(), primary.id()); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + entry.cached(cacheCtx.near().entryEx(entry.key())); + } + } + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { + @Override public String apply(IgniteInternalFuture<?> f) { + return "[node=" + ((MiniFuture)f).node().id() + + ", loc=" + ((MiniFuture)f).node().isLocal() + + ", done=" + f.isDone() + "]"; + } + }); + + return S.toString(GridNearOptimisticSerializableTxPrepareFuture.class, this, + "innerFuts", futs, + "tx", tx, + "super", super.toString()); + } + + /** + * + */ + private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final IgniteUuid futId = IgniteUuid.randomUuid(); + + /** Keys. */ + @GridToStringInclude + private GridDistributedTxMapping m; + + /** Flag to signal some result being processed. */ + private AtomicBoolean rcvRes = new AtomicBoolean(false); + + /** + * @param m Mapping. + */ + MiniFuture(GridDistributedTxMapping m) { + this.m = m; + } + + /** + * @return Future ID. + */ + IgniteUuid futureId() { + return futId; + } + + /** + * @return Node ID. + */ + public ClusterNode node() { + return m.node(); + } + + /** + * @return Keys. + */ + public GridDistributedTxMapping mapping() { + return m; + } + + /** + * @param e Error. + */ + void onResult(Throwable e) { + if (rcvRes.compareAndSet(false, true)) { + err.compareAndSet(null, e); + + if (log.isDebugEnabled()) + log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); + + // Fail. + onDone(e); + } + else + U.warn(log, "Received error after another result has been processed [fut=" + + GridNearOptimisticSerializableTxPrepareFuture.this + ", mini=" + this + ']', e); + } + + /** + * @param e Node failure. + */ + void onResult(ClusterTopologyCheckedException e) { + if (isDone()) + return; + + if (rcvRes.compareAndSet(false, true)) { + if (log.isDebugEnabled()) + log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this); + + onError(null, e); + + onDone(e); + } + } + + /** + * @param nodeId Failed node ID. + * @param res Result callback. + */ + void onResult(UUID nodeId, GridNearTxPrepareResponse res) { + if (isDone()) + return; + + if (rcvRes.compareAndSet(false, true)) { + if (res.error() != null) { + // Fail the whole compound future. + onError(nodeId, res.error()); + + onDone(res.error()); + } + else { + if (res.clientRemapVersion() != null) { + assert cctx.kernalContext().clientNode(); + assert m.clientFirst(); + + IgniteInternalFuture<?> affFut = cctx.exchange().affinityReadyFuture(res.clientRemapVersion()); + + if (affFut != null && !affFut.isDone()) { + affFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + try { + fut.get(); + + remap(); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + }); + } + else + remap(); + } + else { + onPrepareResponse(m, res); + + // Finish this mini future. + onDone(tx); + } + } + } + } + + /** + * + */ + private void remap() { + prepareOnTopology(true, new Runnable() { + @Override public void run() { + onDone(tx); + } + }); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error()); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3ed37b26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index ea96649..ef9f77e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -557,6 +557,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** + * TODO IGNITE-1607: remove this method? + * * Removes mapping in case of optimistic tx failure on primary node. * * @param failedNodeId Failed node ID. @@ -586,6 +588,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } /** + * @param nodeId Primary node id. + */ + void onOptimisticException(UUID nodeId) { + mappings.remove(nodeId); + } + + /** * @param nodeId Node ID to mark with explicit lock. * @return {@code True} if mapping was found. */ @@ -743,8 +752,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { if (fut == null) { // Future must be created before any exception can be thrown. - fut = optimistic() ? new GridNearOptimisticTxPrepareFuture(cctx, this) : - new GridNearPessimisticTxPrepareFuture(cctx, this); + if (optimistic()) { + fut = isolation() == TransactionIsolation.SERIALIZABLE_TRY_LOCK ? + new GridNearOptimisticSerializableTxPrepareFuture(cctx, this) : + new GridNearOptimisticTxPrepareFuture(cctx, this); + } + else + fut = new GridNearPessimisticTxPrepareFuture(cctx, this); if (!prepFut.compareAndSet(null, fut)) return prepFut.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/3ed37b26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 4074eee..d431cb6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1628,12 +1628,18 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { throws IgniteCheckedException { assert tx.optimistic() || !tx.local(); - long remainingTime = U.currentTimeMillis() - (tx.startTime() + tx.timeout()); + long timeout; - // For serializable transactions, failure to acquire lock means - // that there is a serializable conflict. For all other isolation levels, - // we wait for the lock. - long timeout = tx.timeout() == 0 ? 0 : remainingTime; + if (tx.isolation() != TransactionIsolation.SERIALIZABLE_TRY_LOCK) { + long remainingTime = U.currentTimeMillis() - (tx.startTime() + tx.timeout()); + + // For serializable transactions, failure to acquire lock means + // that there is a serializable conflict. For all other isolation levels, + // we wait for the lock. + timeout = tx.timeout() == 0 ? 0 : remainingTime; + } + else + timeout = -1L; for (IgniteTxEntry txEntry1 : entries) { // Check if this entry was prepared before. http://git-wip-us.apache.org/repos/asf/ignite/blob/3ed37b26/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java index d7671f0..c43396c 100644 --- a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java +++ b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java @@ -31,7 +31,10 @@ public enum TransactionIsolation { REPEATABLE_READ, /** Serializable isolation level. */ - SERIALIZABLE; + SERIALIZABLE, + + /** TODO IGNITE-1607 */ + SERIALIZABLE_TRY_LOCK; /** Enum values. */ private static final TransactionIsolation[] VALS = values(); @@ -42,8 +45,7 @@ public enum TransactionIsolation { * @param ord Ordinal value. * @return Enumerated value or {@code null} if ordinal out of range. */ - @Nullable - public static TransactionIsolation fromOrdinal(int ord) { + @Nullable public static TransactionIsolation fromOrdinal(int ord) { return ord >= 0 && ord < VALS.length ? VALS[ord] : null; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/3ed37b26/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeadlockFreeTxTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeadlockFreeTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeadlockFreeTxTest.java new file mode 100644 index 0000000..f156978 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeadlockFreeTxTest.java @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionOptimisticException; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE_TRY_LOCK; + +/** + * + */ +public class CacheDeadlockFreeTxTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int SRVS = 3; + + /** */ + private static final int CLIENTS = 3; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(SRVS); + + client = true; + + startGridsMultiThreaded(SRVS, CLIENTS); + + client = false; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testTxRollbackIfLocked1() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + final IgniteCache<Integer, Integer> cache = + ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, true)); + + final Integer key = nearKey(cache); + + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key, 1); + + log.info("Locked key: " + key); + + latch1.countDown(); + + assertTrue(latch2.await(10, SECONDS)); + + tx.commit(); + } + + return null; + } + }, "lock-thread"); + + assertTrue(latch1.await(10, SECONDS)); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE_TRY_LOCK)) { + cache.put(key, 2); + + log.info("Commit"); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + latch2.countDown(); + + fut.get(); + + assertEquals(1, (Object)cache.get(key)); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE_TRY_LOCK)) { + cache.put(key, 2); + + tx.commit(); + } + + assertEquals(2, (Object)cache.get(key)); + } + + /** + * @throws Exception If failed. + */ + public void testTxRollbackIfLocked2() throws Exception { + rollbackIfLockedPartialLock(false); + } + + /** + * @throws Exception If failed. + */ + public void testTxRollbackIfLocked3() throws Exception { + rollbackIfLockedPartialLock(true); + } + + /** + * @param locKey If {@code true} gets lock for local key. + * @throws Exception If failed. + */ + public void rollbackIfLockedPartialLock(boolean locKey) throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + final IgniteCache<Integer, Integer> cache = + ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, true)); + + final Integer key1 = primaryKey(ignite(1).cache(cache.getName())); + final Integer key2 = locKey ? primaryKey(cache) : primaryKey(ignite(2).cache(cache.getName())); + + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key1, 1); + + log.info("Locked key: " + key1); + + latch1.countDown(); + + assertTrue(latch2.await(10, SECONDS)); + + log.info("Commit1"); + + tx.commit(); + } + + return null; + } + }, "lock-thread"); + + assertTrue(latch1.await(10, SECONDS)); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE_TRY_LOCK)) { + cache.put(key1, 2); + cache.put(key2, 2); + + log.info("Commit2"); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + latch2.countDown(); + + fut.get(); + + assertEquals(1, (Object) cache.get(key1)); + assertNull(cache.get(key2)); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE_TRY_LOCK)) { + cache.put(key1, 2); + cache.put(key2, 2); + + log.info("Commit3"); + + tx.commit(); + } + + assertEquals(2, (Object) cache.get(key2)); + assertEquals(2, (Object) cache.get(key2)); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdateNoDeadlock() throws Exception { + concurrentUpdateNoDeadlock(Collections.singletonList(ignite(0)), 10, false); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdateNoDeadlockNodeRestart() throws Exception { + concurrentUpdateNoDeadlock(Collections.singletonList(ignite(1)), 10, true); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdateNoDeadlockClients() throws Exception { + concurrentUpdateNoDeadlock(clients(), 20, false); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdateNoDeadlockClientsNodeRestart() throws Exception { + concurrentUpdateNoDeadlock(clients(), 20, true); + } + + /** + * @return Client nodes. + */ + private List<Ignite> clients() { + List<Ignite> clients = new ArrayList<>(); + + for (int i = 0; i < CLIENTS; i++) { + Ignite ignite = ignite(SRVS + i); + + assertTrue(ignite.configuration().isClientMode()); + + clients.add(ignite); + } + + return clients; + } + + /** + * @param updateNodes Nodes executing updates. + * @param threads Number of threads executing updates. + * @param restart If {@code true} restarts one node. + * @throws Exception If failed. + */ + private void concurrentUpdateNoDeadlock(final List<Ignite> updateNodes, + int threads, + final boolean restart) throws Exception { + assert updateNodes.size() > 0; + + final Ignite ignite0 = ignite(0); + + final String cacheName = + ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName(); + + final int KEYS = 100; + + final AtomicBoolean finished = new AtomicBoolean(); + + IgniteInternalFuture<Object> fut = null; + + try { + if (restart) { + fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (!finished.get()) { + stopGrid(0); + + U.sleep(300); + + Ignite ignite = startGrid(0); + + assertFalse(ignite.configuration().isClientMode()); + } + + return null; + } + }); + } + + for (int i = 0; i < 10; i++) { + log.info("Iteration: " + i); + + final long stopTime = U.currentTimeMillis() + 5_000; + + final AtomicInteger idx = new AtomicInteger(); + + IgniteInternalFuture<?> updateFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + int nodeIdx = idx.getAndIncrement() % updateNodes.size(); + + Ignite node = updateNodes.get(nodeIdx); + + log.info("Tx thread: " + node.name()); + + final IgniteTransactions txs = node.transactions(); + + final IgniteCache<Integer, Integer> cache = node.cache(cacheName); + + assertNotNull(cache); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (U.currentTimeMillis() < stopTime) { + final Map<Integer, Integer> keys = new LinkedHashMap<>(); + + for (int i = 0; i < KEYS / 2; i++) + keys.put(rnd.nextInt(KEYS), rnd.nextInt()); + + try { + if (restart) { + doInTransaction(node, OPTIMISTIC, REPEATABLE_READ, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.putAll(keys); + + return null; + } + }); + } else { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE_TRY_LOCK)) { + cache.putAll(keys); + + tx.commit(); + } + } + } catch (TransactionOptimisticException ignore) { + // No-op. + } catch (Throwable e) { + log.error("Unexpected error: " + e, e); + + throw e; + } + } + + return null; + } + }, threads, "tx-thread"); + + updateFut.get(60, SECONDS); + + IgniteCache<Integer, Integer> cache = ignite(1).cache(cacheName); + + for (int key = 0; key < KEYS; key++) { + Integer val = cache.get(key); + + for (int node = 1; node < SRVS + CLIENTS; node++) + assertEquals(val, ignite(node).cache(cache.getName()).get(key)); + } + } + + finished.set(true); + + if (fut != null) + fut.get(); + } + finally { + finished.set(true); + } + } + + /** + * @param cacheMode Cache mode. + * @param syncMode Write synchronization mode. + * @param backups Number of backups. + * @param storeEnabled If {@code true} adds cache store. + * @param nearCache If {@code true} near cache is enabled. + * @return Cache configuration. + */ + private CacheConfiguration<Integer, Integer> cacheConfiguration( + CacheMode cacheMode, + CacheWriteSynchronizationMode syncMode, + int backups, + boolean storeEnabled, + boolean nearCache) { + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); + + ccfg.setCacheMode(cacheMode); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setBackups(backups); + ccfg.setWriteSynchronizationMode(syncMode); + + if (storeEnabled) { + ccfg.setCacheStoreFactory(new TestStoreFactory()); + ccfg.setWriteThrough(true); + } + + if (nearCache) + ccfg.setNearConfiguration(new NearCacheConfiguration<Integer, Integer>()); + + return ccfg; + } + + /** + * + */ + private static class TestStoreFactory implements Factory<CacheStore<Integer, Integer>> { + /** {@inheritDoc} */ + @Override public CacheStore<Integer, Integer> create() { + return new CacheStoreAdapter<Integer, Integer>() { + @Override public Integer load(Integer key) throws CacheLoaderException { + return null; + } + + @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) { + // No-op. + } + + @Override public void delete(Object key) { + // No-op. + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3ed37b26/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 4bcf51e..8c9302f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -77,6 +77,8 @@ import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.testframework.junits.GridAbstractTest; import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionRollbackException; import org.jetbrains.annotations.Nullable; @@ -1021,8 +1023,23 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { * @throws Exception If failed. */ protected <T> T doInTransaction(Ignite ignite, Callable<T> clo) throws Exception { + return doInTransaction(ignite, PESSIMISTIC, REPEATABLE_READ, clo); + } + + /** + * @param ignite Ignite instance. + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @param clo Closure. + * @return Result of closure execution. + * @throws Exception If failed. + */ + protected <T> T doInTransaction(Ignite ignite, + TransactionConcurrency concurrency, + TransactionIsolation isolation, + Callable<T> clo) throws Exception { while (true) { - try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) { T res = clo.call(); tx.commit();
