http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 6db00ab..af43113 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; @@ -36,17 +35,14 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; -import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; @@ -57,7 +53,6 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.transactions.TransactionOptimisticException; import org.apache.ignite.transactions.TransactionTimeoutException; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentLinkedDeque8; @@ -69,7 +64,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING; /** * */ -public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAdapter +public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter implements GridCacheMvccFuture<IgniteInternalTx> { /** */ @GridToStringInclude @@ -82,7 +77,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) { super(cctx, tx); - assert tx.optimistic() : tx; + assert tx.optimistic() && !tx.serializable() : tx; } /** {@inheritDoc} */ @@ -139,11 +134,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd } /** - * @param nodeId Failed node ID. - * @param mappings Remaining mappings. * @param e Error. */ - void onError(@Nullable UUID nodeId, @Nullable Iterable<GridDistributedTxMapping> mappings, Throwable e) { + void onError(Throwable e) { if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) { if (tx.onePhaseCommit()) { tx.markForBackupCheck(); @@ -157,12 +150,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd if (err.compareAndSet(null, e)) { boolean marked = tx.setRollbackOnly(); - if (e instanceof IgniteTxOptimisticCheckedException) { - assert nodeId != null : "Missing node ID for optimistic failure exception: " + e; - - tx.removeKeysMapping(nodeId, mappings); - } - if (e instanceof IgniteTxRollbackCheckedException) { if (marked) { try { @@ -199,7 +186,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd /** {@inheritDoc} */ @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) { if (!isDone()) { - for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) { + for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : pending()) { if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -253,212 +240,38 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd return false; } - /** {@inheritDoc} */ - @Override public void prepare() { - // Obtain the topology version to use. - AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); - - // If there is another system transaction in progress, use it's topology version to prevent deadlock. - if (topVer == null && tx != null && tx.system()) { - IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx); - - if (tx0 != null) - topVer = tx0.topologyVersionSnapshot(); - } - - if (topVer != null) { - tx.topologyVersion(topVer); - - cctx.mvcc().addFuture(this); - - prepare0(false, true); - - return; - } - - prepareOnTopology(false, null); - } - - /** - * @param remap Remap flag. - * @param c Optional closure to run after map. - */ - private void prepareOnTopology(final boolean remap, @Nullable final Runnable c) { - GridDhtTopologyFuture topFut = topologyReadLock(); - - AffinityTopologyVersion topVer = null; - - try { - if (topFut == null) { - assert isDone(); - - return; - } - - if (topFut.isDone()) { - topVer = topFut.topologyVersion(); - - if (remap) - tx.onRemap(topVer); - else - tx.topologyVersion(topVer); - - if (!remap) - cctx.mvcc().addFuture(this); - } - } - finally { - topologyReadUnlock(); - } - - if (topVer != null) { - StringBuilder invalidCaches = null; - - for (Integer cacheId : tx.activeCacheIds()) { - GridCacheContext ctx = cctx.cacheContext(cacheId); - - assert ctx != null : cacheId; - - Throwable err = topFut.validateCache(ctx); - - if (err != null) { - if (invalidCaches != null) - invalidCaches.append(", "); - else - invalidCaches = new StringBuilder(); - - invalidCaches.append(U.maskName(ctx.name())); - } - } - - if (invalidCaches != null) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + - invalidCaches.toString())); - - return; - } - - prepare0(remap, false); - - if (c != null) - c.run(); - } - else { - topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { - cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { - @Override public void run() { - try { - fut.get(); - - prepareOnTopology(remap, c); - } - catch (IgniteCheckedException e) { - onDone(e); - } - finally { - cctx.txContextReset(); - } - } - }); - } - }); - } - } - - /** - * Acquires topology read lock. - * - * @return Topology ready future. - */ - private GridDhtTopologyFuture topologyReadLock() { - if (tx.activeCacheIds().isEmpty()) - return cctx.exchange().lastTopologyFuture(); - - GridCacheContext<?, ?> nonLocCtx = null; - - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); - - if (!cacheCtx.isLocal()) { - nonLocCtx = cacheCtx; - - break; - } - } - - if (nonLocCtx == null) - return cctx.exchange().lastTopologyFuture(); - - nonLocCtx.topology().readLock(); - - if (nonLocCtx.topology().stopping()) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + - nonLocCtx.name())); - - return null; - } - - return nonLocCtx.topology().topologyVersionFuture(); - } - - /** - * Releases topology read lock. - */ - private void topologyReadUnlock() { - if (!tx.activeCacheIds().isEmpty()) { - GridCacheContext<?, ?> nonLocCtx = null; - - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); - - if (!cacheCtx.isLocal()) { - nonLocCtx = cacheCtx; - - break; - } - } - - if (nonLocCtx != null) - nonLocCtx.topology().readUnlock(); - } - } - /** * Initializes future. * * @param remap Remap flag. * @param topLocked {@code True} if thread already acquired lock preventing topology change. */ - private void prepare0(boolean remap, boolean topLocked) { + @Override protected void prepare0(boolean remap, boolean topLocked) { try { boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING); if (!txStateCheck) { if (tx.setRollbackOnly()) { if (tx.timedOut()) - onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " + + onError(new IgniteTxTimeoutCheckedException("Transaction timed out and " + "was rolled back: " + this)); else - onError(null, null, new IgniteCheckedException("Invalid transaction state for prepare " + + onError(new IgniteCheckedException("Invalid transaction state for prepare " + "[state=" + tx.state() + ", tx=" + this + ']')); } else - onError(null, null, new IgniteTxRollbackCheckedException("Invalid transaction state for " + + onError(new IgniteTxRollbackCheckedException("Invalid transaction state for " + "prepare [state=" + tx.state() + ", tx=" + this + ']')); return; } - prepare( - tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(), - tx.writeEntries(), - topLocked); + prepare(tx.writeEntries(), topLocked); markInitialized(); } - catch (TransactionTimeoutException | TransactionOptimisticException e) { - onError(cctx.localNodeId(), null, e); + catch (TransactionTimeoutException e) { + onError( e); } catch (IgniteCheckedException e) { onDone(e); @@ -466,13 +279,11 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd } /** - * @param reads Read entries. * @param writes Write entries. * @param topLocked {@code True} if thread already acquired lock preventing topology change. * @throws IgniteCheckedException If failed. */ private void prepare( - Iterable<IgniteTxEntry> reads, Iterable<IgniteTxEntry> writes, boolean topLocked ) throws IgniteCheckedException { @@ -484,7 +295,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings = new ConcurrentLinkedDeque8<>(); - if (!F.isEmpty(reads) || !F.isEmpty(writes)) { + if (!F.isEmpty(writes)) { for (int cacheId : tx.activeCacheIds()) { GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); @@ -500,25 +311,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd // Assign keys to primary nodes. GridDistributedTxMapping cur = null; - for (IgniteTxEntry read : reads) { - GridDistributedTxMapping updated = map(read, topVer, cur, false, topLocked); - - if (cur != updated) { - mappings.offer(updated); - - if (updated.node().isLocal()) { - if (read.context().isNear()) - tx.nearLocallyMapped(true); - else if (read.context().isColocated()) - tx.colocatedLocallyMapped(true); - } - - cur = updated; - } - } - for (IgniteTxEntry write : writes) { - GridDistributedTxMapping updated = map(write, topVer, cur, true, topLocked); + GridDistributedTxMapping updated = map(write, topVer, cur, topLocked); if (cur != updated) { mappings.offer(updated); @@ -576,7 +370,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd futId, tx.topologyVersion(), tx, - tx.optimistic() && tx.serializable() ? m.reads() : null, + null, m.writes(), m.near(), txMapping.transactionNodes(), @@ -604,7 +398,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd tx.userPrepare(); } catch (IgniteCheckedException e) { - onError(null, null, e); + onError(e); } } @@ -651,7 +445,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd * @param entry Transaction entry. * @param topVer Topology version. * @param cur Current mapping. - * @param waitLock Wait lock flag. * @param topLocked {@code True} if thread already acquired lock preventing topology change. * @return Mapping. */ @@ -659,7 +452,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd IgniteTxEntry entry, AffinityTopologyVersion topVer, @Nullable GridDistributedTxMapping cur, - boolean waitLock, boolean topLocked ) { GridCacheContext cacheCtx = entry.context(); @@ -687,7 +479,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd entry.cached(cacheCtx.local().entryEx(entry.key(), topVer)); if (cacheCtx.isNear() || cacheCtx.isLocal()) { - if (waitLock && entry.explicitVersion() == null) + if (entry.explicitVersion() == null) lockKeys.add(entry.txKey()); } @@ -749,7 +541,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd /** * */ - private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> { + private class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> { /** */ private static final long serialVersionUID = 0L; @@ -828,7 +620,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd // Fail the whole future (make sure not to remap on different primary node // to prevent multiple lock coordinators). - onError(null, null, e); + onError(e); } } @@ -836,14 +628,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd * @param nodeId Failed node ID. * @param res Result callback. */ - void onResult(UUID nodeId, GridNearTxPrepareResponse res) { + void onResult(UUID nodeId, final GridNearTxPrepareResponse res) { if (isDone()) return; if (rcvRes.compareAndSet(false, true)) { if (res.error() != null) { // Fail the whole compound future. - onError(nodeId, mappings, res.error()); + onError(res.error()); } else { if (res.clientRemapVersion() != null) { @@ -877,7 +669,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd proceedPrepare(mappings); // Finish this mini future. - onDone(tx); + onDone((GridNearTxPrepareResponse)null); } } } @@ -889,7 +681,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd private void remap() { prepareOnTopology(true, new Runnable() { @Override public void run() { - onDone(tx); + onDone((GridNearTxPrepareResponse)null); } }); }
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java new file mode 100644 index 0000000..fd9183e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearTxPrepareFutureAdapter { + /** + * @param cctx Context. + * @param tx Transaction. + */ + public GridNearOptimisticTxPrepareFutureAdapter(GridCacheSharedContext cctx, GridNearTxLocal tx) { + super(cctx, tx); + + assert tx.optimistic() : tx; + } + + /** {@inheritDoc} */ + @Override public final void prepare() { + // Obtain the topology version to use. + AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); + + // If there is another system transaction in progress, use it's topology version to prevent deadlock. + if (topVer == null && tx != null && tx.system()) { + IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx); + + if (tx0 != null) + topVer = tx0.topologyVersionSnapshot(); + } + + if (topVer != null) { + tx.topologyVersion(topVer); + + cctx.mvcc().addFuture(this); + + prepare0(false, true); + + return; + } + + prepareOnTopology(false, null); + } + + /** + * Acquires topology read lock. + * + * @return Topology ready future. + */ + protected final GridDhtTopologyFuture topologyReadLock() { + if (tx.activeCacheIds().isEmpty()) + return cctx.exchange().lastTopologyFuture(); + + GridCacheContext<?, ?> nonLocCtx = null; + + for (int cacheId : tx.activeCacheIds()) { + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + if (!cacheCtx.isLocal()) { + nonLocCtx = cacheCtx; + + break; + } + } + + if (nonLocCtx == null) + return cctx.exchange().lastTopologyFuture(); + + nonLocCtx.topology().readLock(); + + if (nonLocCtx.topology().stopping()) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + nonLocCtx.name())); + + return null; + } + + return nonLocCtx.topology().topologyVersionFuture(); + } + + /** + * Releases topology read lock. + */ + protected final void topologyReadUnlock() { + if (!tx.activeCacheIds().isEmpty()) { + GridCacheContext<?, ?> nonLocCtx = null; + + for (int cacheId : tx.activeCacheIds()) { + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + if (!cacheCtx.isLocal()) { + nonLocCtx = cacheCtx; + + break; + } + } + + if (nonLocCtx != null) + nonLocCtx.topology().readUnlock(); + } + } + + /** + * @param remap Remap flag. + * @param c Optional closure to run after map. + */ + protected final void prepareOnTopology(final boolean remap, @Nullable final Runnable c) { + GridDhtTopologyFuture topFut = topologyReadLock(); + + AffinityTopologyVersion topVer = null; + + try { + if (topFut == null) { + assert isDone(); + + return; + } + + if (topFut.isDone()) { + topVer = topFut.topologyVersion(); + + if (remap) + tx.onRemap(topVer); + else + tx.topologyVersion(topVer); + + if (!remap) + cctx.mvcc().addFuture(this); + } + } + finally { + topologyReadUnlock(); + } + + if (topVer != null) { + StringBuilder invalidCaches = null; + + for (Integer cacheId : tx.activeCacheIds()) { + GridCacheContext ctx = cctx.cacheContext(cacheId); + + assert ctx != null : cacheId; + + Throwable err = topFut.validateCache(ctx); + + if (err != null) { + if (invalidCaches != null) + invalidCaches.append(", "); + else + invalidCaches = new StringBuilder(); + + invalidCaches.append(U.maskName(ctx.name())); + } + } + + if (invalidCaches != null) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + + invalidCaches.toString())); + + return; + } + + prepare0(remap, false); + + if (c != null) + c.run(); + } + else { + topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { + cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { + @Override public void run() { + try { + fut.get(); + + prepareOnTopology(remap, c); + } + catch (IgniteCheckedException e) { + onDone(e); + } + finally { + cctx.txContextReset(); + } + } + }); + } + }); + } + } + + /** + * @param remap Remap flag. + * @param topLocked {@code True} if thread already acquired lock preventing topology change. + */ + protected abstract void prepare0(boolean remap, boolean topLocked); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 62f9bb3..11d31b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -103,7 +103,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA if (!isDone()) { assert res.clientRemapVersion() == null : res; - for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) { + for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : pending()) { MiniFuture f = (MiniFuture)fut; if (f.futureId().equals(res.miniId())) { @@ -292,7 +292,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA /** * */ - private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> { + private class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> { /** */ private static final long serialVersionUID = 0L; @@ -332,7 +332,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA else { onPrepareResponse(m, res); - onDone(tx); + onDone(res); } } @@ -344,7 +344,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA tx.markForBackupCheck(); // Do not fail future for one-phase transaction right away. - onDone(tx); + onDone((GridNearTxPrepareResponse)null); } onError(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index c3bb324..0e8aa0d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -117,7 +117,6 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> @Nullable final Collection<? extends K> keys, boolean forcePrimary, boolean skipTx, - @Nullable final GridCacheEntryEx entry, @Nullable UUID subjId, String taskName, final boolean deserializePortable, @@ -143,7 +142,6 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) { return tx.getAllAsync(ctx, ctx.cacheKeysView(keys), - entry, deserializePortable, skipVals, false, @@ -156,7 +154,6 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> return loadAsync(null, ctx.cacheKeysView(keys), - false, forcePrimary, subjId, taskName, @@ -174,6 +171,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> * @param deserializePortable Deserialize portable flag. * @param expiryPlc Expiry policy. * @param skipVals Skip values flag. + * @param needVer If {@code true} returns values as tuples containing value and version. * @return Future. */ IgniteInternalFuture<Map<K, V>> txLoadAsync(GridNearTxLocal tx, @@ -181,21 +179,23 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> boolean readThrough, boolean deserializePortable, @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean skipVals) { + boolean skipVals, + boolean needVer) { assert tx != null; GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx, keys, readThrough, - false, - false, + /*force primary*/needVer, tx, CU.subjectId(tx, ctx.shared()), tx.resolveTaskName(), deserializePortable, expiryPlc, skipVals, - /*can remap*/true); + /*can remap*/true, + needVer, + /*keepCacheObjects*/true); // init() will register future for responses if it has remote mappings. fut.init(); http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 1a4f130..46c9f3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -266,9 +266,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu } if (tx.onePhaseCommit()) { - finishOnePhase(); + boolean commit = this.commit && err == null; - tx.tmFinish(commit && err == null); + finishOnePhase(commit); + + tx.tmFinish(commit); } Throwable th = this.err.get(); @@ -510,9 +512,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu } /** - * + * @param commit Commit flag. */ - private void finishOnePhase() { + private void finishOnePhase(boolean commit) { // No need to send messages as transaction was already committed on remote node. // Finish local mapping only as we need send commit message to backups. for (GridDistributedTxMapping m : mappings.values()) { @@ -522,6 +524,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu // Add new future. if (fut != null) add(fut); + + break; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index ea96649..883c285 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -55,14 +55,15 @@ import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.lang.GridClosureException; +import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; @@ -342,32 +343,30 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> loadMissing( + @Override public IgniteInternalFuture<Void> loadMissing( final GridCacheContext cacheCtx, boolean readThrough, boolean async, final Collection<KeyCacheObject> keys, - boolean deserializePortable, boolean skipVals, - final IgniteBiInClosure<KeyCacheObject, Object> c + final boolean needVer, + final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c ) { if (cacheCtx.isNear()) { return cacheCtx.nearTx().txLoadAsync(this, keys, readThrough, - deserializePortable, + /*deserializePortable*/false, accessPolicy(cacheCtx, keys), - skipVals).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() { - @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) { + skipVals, + needVer).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() { + @Override public Void apply(IgniteInternalFuture<Map<Object, Object>> f) { try { Map<Object, Object> map = f.get(); - // Must loop through keys, not map entries, - // as map entries may not have all the keys. - for (KeyCacheObject key : keys) - c.apply(key, map.get(key.value(cacheCtx.cacheObjectContext(), false))); + processLoaded(map, keys, needVer, c); - return true; + return null; } catch (Exception e) { setRollbackOnly(); @@ -381,39 +380,73 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { return cacheCtx.colocated().loadAsync( keys, readThrough, - /*reload*/false, - /*force primary*/false, + /*force primary*/needVer, topologyVersion(), CU.subjectId(this, cctx), resolveTaskName(), - deserializePortable, + /*deserializePortable*/false, accessPolicy(cacheCtx, keys), skipVals, - /*can remap*/true - ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() { - @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) { - try { - Map<Object, Object> map = f.get(); - - // Must loop through keys, not map entries, - // as map entries may not have all the keys. - for (KeyCacheObject key : keys) - c.apply(key, map.get(key.value(cacheCtx.cacheObjectContext(), false))); - - return true; - } - catch (Exception e) { - setRollbackOnly(); - - throw new GridClosureException(e); - } + /*can remap*/true, + needVer, + /*keepCacheObject*/true + ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() { + @Override public Void apply(IgniteInternalFuture<Map<Object, Object>> f) { + try { + Map<Object, Object> map = f.get(); + + processLoaded(map, keys, needVer, c); + + return null; } - }); + catch (Exception e) { + setRollbackOnly(); + + throw new GridClosureException(e); + } + } + }); } else { assert cacheCtx.isLocal(); - return super.loadMissing(cacheCtx, readThrough, async, keys, deserializePortable, skipVals, c); + return super.loadMissing(cacheCtx, readThrough, async, keys, skipVals, needVer, c); + } + } + + /** + * @param map Loaded values. + * @param keys Keys. + * @param needVer If {@code true} version is required for loaded values. + * @param c Closure. + */ + private void processLoaded( + Map<Object, Object> map, + final Collection<KeyCacheObject> keys, + boolean needVer, + GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c) { + for (KeyCacheObject key : keys) { + Object val = map.get(key); + + if (val != null) { + Object v; + GridCacheVersion ver; + + if (needVer) { + T2<Object, GridCacheVersion> t = (T2)val; + + v = t.get1(); + ver = t.get2(); + } + else { + v = val; + ver = null; + } + + c.apply(key, v, ver); + } + else + c.apply(key, null, IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER); } } @@ -555,36 +588,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } } - - /** - * Removes mapping in case of optimistic tx failure on primary node. - * - * @param failedNodeId Failed node ID. - * @param mapQueue Mappings queue. - */ - void removeKeysMapping(UUID failedNodeId, Iterable<GridDistributedTxMapping> mapQueue) { - assert failedNodeId != null; - assert mapQueue != null; - - mappings.remove(failedNodeId); - - if (!F.isEmpty(mapQueue)) { - for (GridDistributedTxMapping m : mapQueue) { - UUID nodeId = m.node().id(); - - GridDistributedTxMapping mapping = mappings.get(nodeId); - - if (mapping != null) { - for (IgniteTxEntry entry : m.entries()) - mapping.removeEntry(entry); - - if (mapping.entries().isEmpty()) - mappings.remove(nodeId); - } - } - } - } - /** * @param nodeId Node ID to mark with explicit lock. * @return {@code True} if mapping was found. @@ -621,7 +624,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers) { - Collection<IgniteTxEntry> entries = F.concat(false, mapping.reads(), mapping.writes()); + Collection<IgniteTxEntry> entries = F.concat(false, mapping.writes(), mapping.reads()); for (IgniteTxEntry txEntry : entries) { while (true) { @@ -743,8 +746,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { if (fut == null) { // Future must be created before any exception can be thrown. - fut = optimistic() ? new GridNearOptimisticTxPrepareFuture(cctx, this) : - new GridNearPessimisticTxPrepareFuture(cctx, this); + if (optimistic()) { + fut = serializable() ? + new GridNearOptimisticSerializableTxPrepareFuture(cctx, this) : + new GridNearOptimisticTxPrepareFuture(cctx, this); + } + else + fut = new GridNearPessimisticTxPrepareFuture(cctx, this); if (!prepFut.compareAndSet(null, fut)) return prepFut.get(); @@ -871,7 +879,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsyncLocal( @Nullable Collection<IgniteTxEntry> reads, @Nullable Collection<IgniteTxEntry> writes, - Map<UUID, Collection<UUID>> txNodes, boolean last, + Map<UUID, Collection<UUID>> txNodes, + boolean last, Collection<UUID> lastBackups ) { if (state() != PREPARING) { @@ -899,7 +908,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { try { // At this point all the entries passed in must be enlisted in transaction because this is an // optimistic transaction. - optimisticLockEntries = writes; + optimisticLockEntries = (serializable() && optimistic()) ? F.concat(false, writes, reads) : writes; userPrepare(); @@ -1192,12 +1201,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { return plc; } - /** - * @param cacheCtx Cache context. - * @param keys Keys. - * @return Expiry policy. - */ - private IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection<KeyCacheObject> keys) { + /** {@inheritDoc} */ + @Override protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection<KeyCacheObject> keys) { if (accessMap != null) { for (Map.Entry<IgniteTxKey, IgniteCacheExpiryPolicy> e : accessMap.entrySet()) { if (e.getKey().cacheId() == cacheCtx.cacheId() && keys.contains(e.getKey().key())) http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java index fac7a12..45477a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java @@ -34,7 +34,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; @@ -48,15 +48,15 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOO /** * Common code for tx prepare in optimistic and pessimistic modes. */ -public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundIdentityFuture<IgniteInternalTx> +public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundFuture<GridNearTxPrepareResponse, IgniteInternalTx> implements GridCacheFuture<IgniteInternalTx> { /** Logger reference. */ protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); /** */ - private static final IgniteReducer<IgniteInternalTx, IgniteInternalTx> REDUCER = - new IgniteReducer<IgniteInternalTx, IgniteInternalTx>() { - @Override public boolean collect(IgniteInternalTx e) { + private static final IgniteReducer<GridNearTxPrepareResponse, IgniteInternalTx> REDUCER = + new IgniteReducer<GridNearTxPrepareResponse, IgniteInternalTx>() { + @Override public boolean collect(GridNearTxPrepareResponse e) { return true; } @@ -94,7 +94,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundIdentit * @param tx Transaction. */ public GridNearTxPrepareFutureAdapter(GridCacheSharedContext cctx, final GridNearTxLocal tx) { - super(cctx.kernalContext(), REDUCER); + super(REDUCER); assert cctx != null; assert tx != null; @@ -201,6 +201,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundIdentit } catch (GridCacheEntryRemovedException ignored) { // Retry. + txEntry.cached(cacheCtx.cache().entryEx(txEntry.key())); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java index cacac13..85ed881 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java @@ -67,6 +67,8 @@ public class GridLocalCacheEntry extends GridCacheMapEntry { * * @param threadId Owning thread ID. * @param ver Lock version. + * @param serOrder Version for serializable transactions ordering. + * @param serReadVer Optional read entry version for optimistic serializable transaction. * @param timeout Timeout to acquire lock. * @param reenter Reentry flag. * @param tx Transaction flag. @@ -77,6 +79,8 @@ public class GridLocalCacheEntry extends GridCacheMapEntry { @Nullable public GridCacheMvccCandidate addLocal( long threadId, GridCacheVersion ver, + @Nullable GridCacheVersion serOrder, + @Nullable GridCacheVersion serReadVer, long timeout, boolean reenter, boolean tx, @@ -91,6 +95,11 @@ public class GridLocalCacheEntry extends GridCacheMapEntry { synchronized (this) { checkObsolete(); + if (serReadVer != null) { + if (!checkSerializableReadVersion(serReadVer)) + return null; + } + GridCacheMvcc mvcc = mvccExtras(); if (mvcc == null) { @@ -103,12 +112,16 @@ public class GridLocalCacheEntry extends GridCacheMapEntry { cand = mvcc.addLocal( this, + /*nearNodeId*/null, + /*nearVer*/null, threadId, ver, timeout, + serOrder, reenter, tx, - implicitSingle + implicitSingle, + /*dht-local*/false ); owner = mvcc.localOwner(); @@ -191,10 +204,16 @@ public class GridLocalCacheEntry extends GridCacheMapEntry { } /** {@inheritDoc} */ - @Override public boolean tmLock(IgniteInternalTx tx, long timeout) throws GridCacheEntryRemovedException { + @Override public boolean tmLock(IgniteInternalTx tx, + long timeout, + @Nullable GridCacheVersion serOrder, + GridCacheVersion serReadVer) + throws GridCacheEntryRemovedException { GridCacheMvccCandidate cand = addLocal( tx.threadId(), tx.xidVersion(), + serOrder, + serReadVer, timeout, /*reenter*/false, /*tx*/true, http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java index 7018c4e..cb14b4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java @@ -225,6 +225,8 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean> GridCacheMvccCandidate c = entry.addLocal( threadId, lockVer, + null, + null, timeout, !inTx(), inTx(), http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index 0bf6ea2..8446665 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -495,7 +495,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { @Nullable final Collection<? extends K> keys, final boolean forcePrimary, boolean skipTx, - @Nullable final GridCacheEntryEx entry, @Nullable UUID subjId, final String taskName, final boolean deserializePortable, @@ -595,10 +594,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { catch (GridCacheEntryRemovedException ignored) { // No-op, retry. } - catch (GridCacheFilterFailedException ignored) { - // No-op, skip the key. - break; - } finally { if (entry != null) ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion()); @@ -615,7 +610,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return getAllAsync( keys, opCtx == null || !opCtx.skipStore(), - null, false, subjId, taskName, @@ -1284,9 +1278,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { catch (GridCacheEntryRemovedException ignore) { assert false : "Entry cannot become obsolete while holding lock."; } - catch (GridCacheFilterFailedException ignore) { - assert false : "Filter should never fail with failFast=false and empty filter."; - } } // Store final batch. http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java index c0c2284..716676f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java @@ -147,12 +147,6 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { cctx.kernalContext().gateway().readLock(); try { - TransactionConfiguration cfg = cctx.gridConfig().getTransactionConfiguration(); - - if (!cfg.isTxSerializableEnabled() && isolation == SERIALIZABLE) - throw new IllegalArgumentException("SERIALIZABLE isolation level is disabled (to enable change " + - "'txSerializableEnabled' configuration property)"); - IgniteInternalTx tx = cctx.tm().userTx(sysCacheCtx); if (tx != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 7d7e3e8..1c82636 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -325,7 +325,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter threadId = Thread.currentThread().getId(); - log = U.logger(cctx.kernalContext(), logRef, this); + if (log == null) + log = U.logger(cctx.kernalContext(), logRef, this); } /** @@ -374,7 +375,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implicitSingle = false; loc = false; - log = U.logger(cctx.kernalContext(), logRef, this); + if (log == null) + log = U.logger(cctx.kernalContext(), logRef, this); } /** {@inheritDoc} */ @@ -430,6 +432,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /** {@inheritDoc} */ @Override public Collection<IgniteTxEntry> optimisticLockEntries() { + if (serializable() && optimistic()) + return F.concat(false, writeEntries(), readEntries()); + return writeEntries(); } @@ -1267,88 +1272,81 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter if (F.isEmpty(txEntry.entryProcessors())) return F.t(txEntry.op(), txEntry.value()); else { - try { - boolean recordEvt = cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ); + boolean recordEvt = cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ); - CacheObject cacheVal = txEntry.hasValue() ? txEntry.value() : - txEntry.cached().innerGet(this, - /*swap*/false, - /*read through*/false, - /*fail fast*/true, - /*unmarshal*/true, - /*metrics*/metrics, - /*event*/recordEvt, - /*temporary*/true, - /*subjId*/subjId, - /**closure name */recordEvt ? F.first(txEntry.entryProcessors()).get1() : null, - resolveTaskName(), - null); + CacheObject cacheVal = txEntry.hasValue() ? txEntry.value() : + txEntry.cached().innerGet(this, + /*swap*/false, + /*read through*/false, + /*fail fast*/true, + /*unmarshal*/true, + /*metrics*/metrics, + /*event*/recordEvt, + /*temporary*/true, + /*subjId*/subjId, + /**closure name */recordEvt ? F.first(txEntry.entryProcessors()).get1() : null, + resolveTaskName(), + null); - boolean modified = false; + boolean modified = false; - Object val = null; + Object val = null; - Object key = null; + Object key = null; - GridCacheVersion ver; - - try { - ver = txEntry.cached().version(); - } - catch (GridCacheEntryRemovedException e) { - assert optimistic() : txEntry; + GridCacheVersion ver; - if (log.isDebugEnabled()) - log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); + try { + ver = txEntry.cached().version(); + } + catch (GridCacheEntryRemovedException e) { + assert optimistic() : txEntry; - ver = null; - } + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); - for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) { - CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(txEntry.context(), - txEntry.key(), key, cacheVal, val, ver); + ver = null; + } - try { - EntryProcessor<Object, Object, Object> processor = t.get1(); + for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) { + CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(txEntry.context(), + txEntry.key(), key, cacheVal, val, ver); - processor.process(invokeEntry, t.get2()); + try { + EntryProcessor<Object, Object, Object> processor = t.get1(); - val = invokeEntry.getValue(); + processor.process(invokeEntry, t.get2()); - key = invokeEntry.key(); - } - catch (Exception ignore) { - // No-op. - } + val = invokeEntry.getValue(); - modified |= invokeEntry.modified(); + key = invokeEntry.key(); + } + catch (Exception ignore) { + // No-op. } - if (modified) - cacheVal = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(val)); + modified |= invokeEntry.modified(); + } - GridCacheOperation op = modified ? (val == null ? DELETE : UPDATE) : NOOP; + if (modified) + cacheVal = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(val)); - if (op == NOOP) { - ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry); + GridCacheOperation op = modified ? (val == null ? DELETE : UPDATE) : NOOP; - if (expiry != null) { - long ttl = CU.toTtl(expiry.getExpiryForAccess()); + if (op == NOOP) { + ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry); - txEntry.ttl(ttl); + if (expiry != null) { + long ttl = CU.toTtl(expiry.getExpiryForAccess()); - if (ttl == CU.TTL_ZERO) - op = DELETE; - } - } + txEntry.ttl(ttl); - return F.t(op, cacheVal); + if (ttl == CU.TTL_ZERO) + op = DELETE; + } } - catch (GridCacheFilterFailedException e) { - assert false : "Empty filter failed for innerGet: " + e; - return null; - } + return F.t(op, cacheVal); } } @@ -1498,9 +1496,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter * @param e Entry to evict if it qualifies for eviction. * @param primaryOnly Flag to try to evict only on primary node. * @return {@code True} if attempt was made to evict the entry. - * @throws IgniteCheckedException If failed. */ - protected boolean evictNearEntry(IgniteTxEntry e, boolean primaryOnly) throws IgniteCheckedException { + protected boolean evictNearEntry(IgniteTxEntry e, boolean primaryOnly) { assert e != null; if (isNearLocallyMapped(e, primaryOnly)) { http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 2462dda..9eb2808 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -66,6 +66,12 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** */ private static final long serialVersionUID = 0L; + /** Dummy version for non-existing entry read in SERIALIZABLE transaction. */ + public static final GridCacheVersion SER_READ_EMPTY_ENTRY_VER = new GridCacheVersion(0, 0, 0, 0); + + /** Dummy version for any existing entry read in SERIALIZABLE transaction. */ + public static final GridCacheVersion SER_READ_NOT_EMPTY_VER = new GridCacheVersion(0, 0, 0, 1); + /** Owning transaction. */ @GridToStringExclude @GridDirectTransient @@ -175,6 +181,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { */ private byte flags; + /** */ + private GridCacheVersion serReadVer; + /** * Required by {@link Externalizable} */ @@ -316,6 +325,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { cp.conflictVer = conflictVer; cp.expiryPlc = expiryPlc; cp.flags = flags; + cp.serReadVer = serReadVer; return cp; } @@ -822,6 +832,23 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { this.entryProcessorCalcVal = entryProcessorCalcVal; } + /** + * @return Read version for serializable transaction. + */ + @Nullable public GridCacheVersion serializableReadVersion() { + return serReadVer; + } + + /** + * @param serReadVer Read version for serializable transaction. + */ + public void serializableReadVersion(GridCacheVersion serReadVer) { + assert this.serReadVer == null; + assert serReadVer != null; + + this.serReadVer = serReadVer; + } + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); @@ -884,18 +911,24 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { writer.incrementState(); case 8: - if (!writer.writeByteArray("transformClosBytes", transformClosBytes)) + if (!writer.writeMessage("serReadVer", serReadVer)) return false; writer.incrementState(); case 9: - if (!writer.writeLong("ttl", ttl)) + if (!writer.writeByteArray("transformClosBytes", transformClosBytes)) return false; writer.incrementState(); case 10: + if (!writer.writeLong("ttl", ttl)) + return false; + + writer.incrementState(); + + case 11: if (!writer.writeMessage("val", val)) return false; @@ -979,7 +1012,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { reader.incrementState(); case 8: - transformClosBytes = reader.readByteArray("transformClosBytes"); + serReadVer = reader.readMessage("serReadVer"); if (!reader.isLastRead()) return false; @@ -987,7 +1020,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { reader.incrementState(); case 9: - ttl = reader.readLong("ttl"); + transformClosBytes = reader.readByteArray("transformClosBytes"); if (!reader.isLastRead()) return false; @@ -995,6 +1028,14 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { reader.incrementState(); case 10: + ttl = reader.readLong("ttl"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: val = reader.readMessage("val"); if (!reader.isLastRead()) @@ -1014,7 +1055,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 11; + return 12; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 530fbdf..d9786a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -68,6 +68,7 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFutureCancelledException; +import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; @@ -417,7 +418,8 @@ public class IgniteTxHandler { if (tx.isRollbackOnly()) { try { - tx.rollback(); + if (tx.state() != TransactionState.ROLLED_BACK && tx.state() != TransactionState.ROLLING_BACK) + tx.rollback(); } catch (IgniteCheckedException e) { U.error(log, "Failed to rollback transaction: " + tx, e);