Repository: ignite Updated Branches: refs/heads/ignite-3478 fce2e31f0 -> d3c049952
ignite-3478 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d3c04995 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d3c04995 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d3c04995 Branch: refs/heads/ignite-3478 Commit: d3c049952384750c5543a9f88b383c033ef74096 Parents: fce2e31 Author: sboikov <[email protected]> Authored: Mon Sep 4 11:52:11 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Sep 4 12:49:57 2017 +0300 ---------------------------------------------------------------------- .../cache/GridCacheSharedContext.java | 14 +++--- .../distributed/dht/GridDhtTxPrepareFuture.java | 10 ++-- ...arOptimisticSerializableTxPrepareFuture.java | 8 +-- .../near/GridNearOptimisticTxPrepareFuture.java | 16 +++--- ...ridNearOptimisticTxPrepareFutureAdapter.java | 4 +- .../GridNearPessimisticTxPrepareFuture.java | 52 ++++++++++++++------ .../near/GridNearTxFinishFuture.java | 1 + .../near/GridNearTxFinishRequest.java | 30 ++++++++++- .../near/GridNearTxPrepareFutureAdapter.java | 8 +-- .../near/GridNearTxPrepareResponse.java | 32 ++++++++---- .../mvcc/CacheCoordinatorsSharedManager.java | 8 ++- .../cache/transactions/IgniteTxHandler.java | 5 +- 12 files changed, 131 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 09c8b1a..3919718 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -124,7 +124,7 @@ public class GridCacheSharedContext<K, V> { private GridCacheSharedTtlCleanupManager ttlMgr; /** Cache mvcc coordinator. */ - private CacheCoordinatorsSharedManager coord; + private CacheCoordinatorsSharedManager crd; /** Cache contexts map. */ private ConcurrentHashMap8<Integer, GridCacheContext<K, V>> ctxMap; @@ -167,7 +167,7 @@ public class GridCacheSharedContext<K, V> { /** * @param kernalCtx Context. - * @param coord Cache mvcc coordinator manager. + * @param crd Cache mvcc coordinator manager. * @param txMgr Transaction manager. * @param verMgr Version manager. * @param mvccMgr MVCC manager. @@ -181,7 +181,7 @@ public class GridCacheSharedContext<K, V> { */ public GridCacheSharedContext( GridKernalContext kernalCtx, - CacheCoordinatorsSharedManager coord, + CacheCoordinatorsSharedManager crd, IgniteTxManager txMgr, GridCacheVersionManager verMgr, GridCacheMvccManager mvccMgr, @@ -200,7 +200,7 @@ public class GridCacheSharedContext<K, V> { this.kernalCtx = kernalCtx; setManagers(mgrs, - coord, + crd, txMgr, jtaMgr, verMgr, @@ -356,7 +356,7 @@ public class GridCacheSharedContext<K, V> { List<GridCacheSharedManager<K, V>> mgrs = new LinkedList<>(); setManagers(mgrs, - coord, + crd, txMgr, jtaMgr, verMgr, @@ -422,7 +422,7 @@ public class GridCacheSharedContext<K, V> { CacheAffinitySharedManager affMgr, GridCacheIoManager ioMgr, GridCacheSharedTtlCleanupManager ttlMgr) { - this.coord = add(mgrs, coord); + this.crd = add(mgrs, coord); this.mvccMgr = add(mgrs, mvccMgr); this.verMgr = add(mgrs, verMgr); this.txMgr = add(mgrs, txMgr); @@ -766,7 +766,7 @@ public class GridCacheSharedContext<K, V> { * @return Cache mvcc coordinator manager. */ public CacheCoordinatorsSharedManager coordinators() { - return coord; + return crd; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index a3d67d2..812b576 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -1223,7 +1223,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite } } - IgniteInternalFuture<Long> waitCoordCntrFut = null; + IgniteInternalFuture<Long> waitCrdCntrFut = null; if (req.requestMvccCounter()) { assert tx.txState().mvccEnabled(cctx); @@ -1235,10 +1235,10 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite if (crd.isLocal()) tx.mvccCoordinatorCounter(cctx.coordinators().requestTxCounterOnCoordinator(tx.nearXidVersion())); else { - IgniteInternalFuture<Long> coordCntrFut = cctx.coordinators().requestTxCounter(crd, tx); + IgniteInternalFuture<Long> crdCntrFut = cctx.coordinators().requestTxCounter(crd, tx); if (tx.onePhaseCommit()) - waitCoordCntrFut = coordCntrFut; + waitCrdCntrFut = crdCntrFut; } } @@ -1263,10 +1263,10 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite return; if (last) { - if (waitCoordCntrFut != null) { + if (waitCrdCntrFut != null) { skipInit = true; - waitCoordCntrFut.listen(new IgniteInClosure<IgniteInternalFuture<Long>>() { + waitCrdCntrFut.listen(new IgniteInClosure<IgniteInternalFuture<Long>>() { @Override public void apply(IgniteInternalFuture<Long> fut) { try { fut.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 69d0940..16535ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -234,7 +234,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim // Avoid iterator creation. for (int i = 0; i < size; i++) { - IgniteInternalFuture<GridNearTxPrepareResponse> fut = future(i); + IgniteInternalFuture fut = future(i); if (!isMini(fut)) continue; @@ -382,14 +382,14 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim MiniFuture fut = new MiniFuture(this, m, ++miniId); - add(fut); + add((IgniteInternalFuture)fut); if (m.primary().isLocal() && m.hasNearCacheEntries() && m.hasColocatedCacheEntries()) { assert locNearEntriesFut == null; locNearEntriesFut = fut; - add(new MiniFuture(this, m, ++miniId)); + add((IgniteInternalFuture)new MiniFuture(this, m, ++miniId)); } } @@ -646,7 +646,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim if (keyLockFut == null) { keyLockFut = new KeyLockFuture(); - add(keyLockFut); + add((IgniteInternalFuture)keyLockFut); } keyLockFut.addLockKey(entry.txKey()); http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 2c23a7a..093ab66 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -220,7 +220,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa int size = futuresCountNoLock(); for (int i = 0; i < size; i++) { - IgniteInternalFuture<GridNearTxPrepareResponse> fut = future(i); + IgniteInternalFuture fut = future(i); if (isMini(fut) && !fut.isDone()) { MiniFuture miniFut = (MiniFuture)fut; @@ -254,7 +254,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa // Avoid iterator creation. for (int i = size - 1; i >= 0; i--) { - IgniteInternalFuture<GridNearTxPrepareResponse> fut = future(i); + IgniteInternalFuture fut = future(i); if (!isMini(fut)) continue; @@ -564,7 +564,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa req.miniId(fut.futureId()); - add(fut); // Append new future. + add((IgniteInternalFuture)fut); // Append new future. if (n.isLocal()) { assert !(m.hasColocatedCacheEntries() && m.hasNearCacheEntries()) : m; @@ -681,7 +681,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa if (keyLockFut == null) { keyLockFut = new KeyLockFuture(); - add(keyLockFut); + add((IgniteInternalFuture)keyLockFut); } keyLockFut.addLockKey(entry.txKey()); @@ -740,7 +740,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa int size = futuresCountNoLock(); for (int i = 0; i < size; i++) { - IgniteInternalFuture<GridNearTxPrepareResponse> fut = future(i); + IgniteInternalFuture fut = future(i); if (isMini(fut) && !fut.isDone()) { MiniFuture miniFut = (MiniFuture)fut; @@ -758,7 +758,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa } } - add(new GridEmbeddedFuture<>(new IgniteBiClosure<TxDeadlock, Exception, GridNearTxPrepareResponse>() { + add(new GridEmbeddedFuture<>(new IgniteBiClosure<TxDeadlock, Exception, Object>() { @Override public GridNearTxPrepareResponse apply(TxDeadlock deadlock, Exception e) { if (e != null) U.warn(log, "Failed to detect deadlock.", e); @@ -790,8 +790,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa ", loc=" + ((MiniFuture)f).node().isLocal() + ", done=" + f.isDone() + "]"; } - }, new P1<IgniteInternalFuture<GridNearTxPrepareResponse>>() { - @Override public boolean apply(IgniteInternalFuture<GridNearTxPrepareResponse> fut) { + }, new P1<IgniteInternalFuture<Object>>() { + @Override public boolean apply(IgniteInternalFuture<Object> fut) { return isMini(fut); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java index f09b6c8..2e33889 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java @@ -171,7 +171,7 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT /** * Keys lock future. */ - protected static class KeyLockFuture extends GridFutureAdapter<GridNearTxPrepareResponse> { + protected static class KeyLockFuture extends GridFutureAdapter<Void> { /** */ @GridToStringInclude protected Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>(); @@ -216,7 +216,7 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT if (log.isDebugEnabled()) log.debug("All locks are acquired for near prepare future: " + this); - onDone((GridNearTxPrepareResponse)null); + onDone((Void)null); } else { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 0cccce3..0559ccd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -36,6 +36,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; @@ -46,7 +48,6 @@ import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; @@ -139,13 +140,17 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA // Avoid iterator creation. for (int i = 0; i < size; i++) { - MiniFuture mini = (MiniFuture)future(i); + IgniteInternalFuture fut = future(i); - if (mini.futureId() == miniId) { - if (!mini.isDone()) - return mini; - else - return null; + if (fut instanceof MiniFuture) { + MiniFuture mini = (MiniFuture)fut; + + if (mini.futureId() == miniId) { + if (!mini.isDone()) + return mini; + else + return null; + } } } } @@ -237,7 +242,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA req.miniId(fut.futureId()); - add(fut); + add((IgniteInternalFuture)fut); IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = nearEntries ? cctx.tm().txHandler().prepareNearTxLocal(req) : @@ -390,7 +395,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA req.miniId(fut.futureId()); - add(fut); + add((IgniteInternalFuture)fut); try { cctx.io().send(primary, req, tx.ioPolicy()); @@ -421,9 +426,13 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA if (mvccCrd != null) { assert !tx.onePhaseCommit(); - IgniteInternalFuture<Long> cntrFut = cctx.coordinators().requestTxCounter(mvccCrd, tx); + if (mvccCrd.isLocal()) + tx.mvccCoordinatorCounter(cctx.coordinators().requestTxCounterOnCoordinator(tx.nearXidVersion())); + else { + IgniteInternalFuture<Long> cntrFut = cctx.coordinators().requestTxCounter(mvccCrd, tx); - add((IgniteInternalFuture)cntrFut); + add((IgniteInternalFuture)cntrFut); + } } markInitialized(); @@ -457,9 +466,21 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA @Override public String toString() { Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { @Override public String apply(IgniteInternalFuture<?> f) { - return "[node=" + ((MiniFuture)f).primary().id() + - ", loc=" + ((MiniFuture)f).primary().isLocal() + - ", done=" + f.isDone() + "]"; + if (f instanceof MiniFuture) { + return "[node=" + ((MiniFuture)f).primary().id() + + ", loc=" + ((MiniFuture)f).primary().isLocal() + + ", done=" + f.isDone() + "]"; + } + else if (f instanceof CacheCoordinatorsSharedManager.MvccCounterFuture) { + CacheCoordinatorsSharedManager.MvccCounterFuture crdFut = + (CacheCoordinatorsSharedManager.MvccCounterFuture)f; + + return "[crdNode=" + crdFut.crd.id() + + ", loc=" + crdFut.crd.isLocal() + + ", done=" + f.isDone() + "]"; + } + else + return f.toString(); } }); @@ -509,6 +530,9 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA if (res.error() != null) onError(res.error()); else { + if (res.mvccCoordinatorCounter() != TxMvccVersion.COUNTER_NA) + tx.mvccCoordinatorCounter(res.mvccCoordinatorCounter()); + onPrepareResponse(m, res, updateMapping); onDone(res); http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index e093eeb..69598d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -714,6 +714,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit tx.size(), tx.subjectId(), tx.taskNameHash(), + tx.mvccCoordinatorCounter(), tx.activeCachesDeploymentEnabled() ); http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java index dc32263..eb6d580 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java @@ -24,6 +24,7 @@ import java.util.UUID; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.lang.IgniteUuid; @@ -42,6 +43,9 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { /** Mini future ID. */ private int miniId; + /** */ + private long mvccCrdCntr = TxMvccVersion.COUNTER_NA; + /** * Empty constructor required for {@link Externalizable}. */ @@ -87,6 +91,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { int txSize, @Nullable UUID subjId, int taskNameHash, + long mvccCrdCntr, boolean addDepInfo) { super( xidVer, @@ -110,6 +115,15 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { explicitLock(explicitLock); storeEnabled(storeEnabled); + + this.mvccCrdCntr = mvccCrdCntr; + } + + /** + * @return Counter. + */ + public long mvccCoordinatorCounter() { + return mvccCrdCntr; } /** @@ -177,6 +191,12 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { writer.incrementState(); + case 22: + if (!writer.writeLong("mvccCrdCntr", mvccCrdCntr)) + return false; + + writer.incrementState(); + } return true; @@ -201,6 +221,14 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); + case 22: + mvccCrdCntr = reader.readLong("mvccCrdCntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(GridNearTxFinishRequest.class); @@ -213,7 +241,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 22; + return 23; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java index a94d6fc..e8893af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java @@ -49,7 +49,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOO * Common code for tx prepare in optimistic and pessimistic modes. */ public abstract class GridNearTxPrepareFutureAdapter extends - GridCacheCompoundFuture<GridNearTxPrepareResponse, IgniteInternalTx> implements GridCacheMvccFuture<IgniteInternalTx> { + GridCacheCompoundFuture<Object, IgniteInternalTx> implements GridCacheMvccFuture<IgniteInternalTx> { /** Logger reference. */ protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); @@ -58,9 +58,9 @@ public abstract class GridNearTxPrepareFutureAdapter extends AtomicReferenceFieldUpdater.newUpdater(GridNearTxPrepareFutureAdapter.class, Throwable.class, "err"); /** */ - private static final IgniteReducer<GridNearTxPrepareResponse, IgniteInternalTx> REDUCER = - new IgniteReducer<GridNearTxPrepareResponse, IgniteInternalTx>() { - @Override public boolean collect(GridNearTxPrepareResponse e) { + private static final IgniteReducer<Object, IgniteInternalTx> REDUCER = + new IgniteReducer<Object, IgniteInternalTx>() { + @Override public boolean collect(Object e) { return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java index 4233371..a23ae4b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java @@ -407,30 +407,36 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse writer.incrementState(); case 15: - if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG)) + if (!writer.writeLong("mvccCrdCntr", mvccCrdCntr)) return false; writer.incrementState(); case 16: - if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG)) + if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 17: - if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG)) + if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 18: - if (!writer.writeMessage("retVal", retVal)) + if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 19: + if (!writer.writeMessage("retVal", retVal)) + return false; + + writer.incrementState(); + + case 20: if (!writer.writeMessage("writeVer", writeVer)) return false; @@ -493,7 +499,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 15: - ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG); + mvccCrdCntr = reader.readLong("mvccCrdCntr"); if (!reader.isLastRead()) return false; @@ -501,7 +507,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 16: - ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG); + ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -509,7 +515,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 17: - pending = reader.readCollection("pending", MessageCollectionItemType.MSG); + ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -517,7 +523,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 18: - retVal = reader.readMessage("retVal"); + pending = reader.readCollection("pending", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -525,6 +531,14 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 19: + retVal = reader.readMessage("retVal"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 20: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -544,7 +558,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 20; + return 21; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java index ec29002..f3287af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java @@ -375,14 +375,18 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager log.info("Assigned mvcc coordinator [topVer=" + discoCache.version() + ", crd=" + newCrd + ']'); + + return; } } + + assignHist.addAssignment(discoCache.version(), curCrd); } /** * */ - private class MvccCounterFuture extends GridFutureAdapter<Long> { + public class MvccCounterFuture extends GridFutureAdapter<Long> { /** */ private final Long id; @@ -390,7 +394,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager private IgniteInternalTx tx; /** */ - private final ClusterNode crd; + public final ClusterNode crd; /** * @param id Future ID. http://git-wip-us.apache.org/repos/asf/ignite/blob/d3c04995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index cac1069..1b31d76 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -859,8 +859,11 @@ public class IgniteTxHandler { else tx = ctx.tm().tx(dhtVer); - if (tx != null) + if (tx != null) { + tx.mvccCoordinatorCounter(req.mvccCoordinatorCounter()); + req.txState(tx.txState()); + } if (tx == null && locTx != null && !req.commit()) { U.warn(log, "DHT local tx not found for near local tx rollback " +
