Repository: ignite Updated Branches: refs/heads/ignite-6149 085a32190 -> c6f894817
ignite-6149 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c6f89481 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c6f89481 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c6f89481 Branch: refs/heads/ignite-6149 Commit: c6f894817ef063984cee1ea886313eecc8da3be0 Parents: 085a321 Author: sboikov <[email protected]> Authored: Mon Sep 11 16:51:11 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Sep 11 17:31:51 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 39 +++++++++----------- .../cache/GridCacheUpdateTxResult.java | 22 +++++------ .../distributed/dht/GridDhtTxFinishFuture.java | 14 ++++++- .../near/GridNearTxFinishFuture.java | 13 +++++++ .../mvcc/CacheCoordinatorsSharedManager.java | 4 +- .../transactions/IgniteTxLocalAdapter.java | 22 ++++++++++- .../processors/cache/GridCacheTestEntryEx.java | 11 +++--- 7 files changed, 80 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c6f89481/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 22754d7..db4b88b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -63,6 +63,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure; +import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.IgniteTree; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; @@ -914,11 +915,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme ) throws IgniteCheckedException, GridCacheEntryRemovedException { CacheObject old; - boolean valid = valid(tx != null ? tx.topologyVersion() : topVer); + final boolean valid = valid(tx != null ? tx.topologyVersion() : topVer); // Lock should be held by now. if (!cctx.isAll(this, filter)) - return new GridCacheUpdateTxResult(false, null); + return new GridCacheUpdateTxResult(false); final GridCacheVersion newVer; @@ -931,6 +932,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme ensureFreeSpace(); + GridLongList mvccWaitTxs = null; + synchronized (this) { checkObsolete(); @@ -939,7 +942,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // It is possible that 'get' could load more recent value. if (!((GridNearCacheEntry)this).recordDhtVersion(dhtVer)) - return new GridCacheUpdateTxResult(false, null); + return new GridCacheUpdateTxResult(false); } assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) : @@ -975,7 +978,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme key0 = e.key(); if (interceptorVal == null) - return new GridCacheUpdateTxResult(false, (CacheObject)cctx.unwrapTemporary(old)); + return new GridCacheUpdateTxResult(false); else if (interceptorVal != val0) val0 = cctx.unwrapTemporary(interceptorVal); @@ -1010,7 +1013,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (cctx.mvccEnabled()) { assert mvccVer != null; - cctx.offheap().mvccUpdate(this, val, newVer, mvccVer); + mvccWaitTxs = cctx.offheap().mvccUpdate(this, val, newVer, mvccVer); } else storeValue(val, expireTime, newVer, null); @@ -1080,8 +1083,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (intercept) cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, val, val0, keepBinary, updateCntr0)); - return valid ? new GridCacheUpdateTxResult(true, retval ? old : null, updateCntr0) : - new GridCacheUpdateTxResult(false, null); + return valid ? new GridCacheUpdateTxResult(true, updateCntr0, mvccWaitTxs) : + new GridCacheUpdateTxResult(false); } /** @@ -1119,11 +1122,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme GridCacheVersion newVer; - boolean valid = valid(tx != null ? tx.topologyVersion() : topVer); + final boolean valid = valid(tx != null ? tx.topologyVersion() : topVer); // Lock should be held by now. if (!cctx.isAll(this, filter)) - return new GridCacheUpdateTxResult(false, null); + return new GridCacheUpdateTxResult(false); GridCacheVersion obsoleteVer = null; @@ -1147,7 +1150,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // It is possible that 'get' could load more recent value. if (!((GridNearCacheEntry)this).recordDhtVersion(dhtVer)) - return new GridCacheUpdateTxResult(false, null); + return new GridCacheUpdateTxResult(false); } assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) : @@ -1175,7 +1178,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (cctx.cancelRemove(interceptRes)) { CacheObject ret = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2())); - return new GridCacheUpdateTxResult(false, ret); + return new GridCacheUpdateTxResult(false); } } @@ -1289,18 +1292,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (intercept) cctx.config().getInterceptor().onAfterRemove(entry0); - if (valid) { - CacheObject ret; - - if (interceptRes != null) - ret = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2())); - else - ret = old; - - return new GridCacheUpdateTxResult(true, ret, updateCntr0); - } + if (valid) + return new GridCacheUpdateTxResult(true, updateCntr0, null); else - return new GridCacheUpdateTxResult(false, null); + return new GridCacheUpdateTxResult(false); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/c6f89481/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java index 461baa7..951d02c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.processors.cache; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; @@ -28,34 +28,30 @@ public class GridCacheUpdateTxResult { /** Success flag.*/ private final boolean success; - /** Old value. */ - @GridToStringInclude - private final CacheObject oldVal; - /** Partition idx. */ private long updateCntr; + /** */ + private GridLongList mvccWaitTxs; + /** * Constructor. * * @param success Success flag. - * @param oldVal Old value (if any), */ - GridCacheUpdateTxResult(boolean success, @Nullable CacheObject oldVal) { + GridCacheUpdateTxResult(boolean success) { this.success = success; - this.oldVal = oldVal; } /** * Constructor. * * @param success Success flag. - * @param oldVal Old value (if any), */ - GridCacheUpdateTxResult(boolean success, @Nullable CacheObject oldVal, long updateCntr) { + GridCacheUpdateTxResult(boolean success, long updateCntr, @Nullable GridLongList mvccWaitTxs) { this.success = success; - this.oldVal = oldVal; this.updateCntr = updateCntr; + this.mvccWaitTxs = mvccWaitTxs; } /** @@ -75,8 +71,8 @@ public class GridCacheUpdateTxResult { /** * @return Old value. */ - @Nullable public CacheObject oldValue() { - return oldVal; + @Nullable public GridLongList mvccWaitTransactions() { + return mvccWaitTxs; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/c6f89481/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 55078cd..6858c82 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -35,10 +35,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFutu import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -292,6 +292,18 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity // No backup or near nodes to send commit message to (just complete then). sync = false; + GridLongList waitTxs = tx.mvccWaitTransactions(); + + if (waitTxs != null) { + ClusterNode crd = cctx.coordinators().coordinator(tx.topologyVersion()); + + assert crd != null; + + IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(crd, waitTxs); + + add(fut); + } + markInitialized(); if (!sync) http://git-wip-us.apache.org/repos/asf/ignite/blob/c6f89481/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 7a90ec4..e57976b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; +import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; @@ -414,6 +415,18 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit try { if (tx.localFinish(commit) || (!commit && tx.state() == UNKNOWN)) { + GridLongList waitTxs = tx.mvccWaitTransactions(); + + if (waitTxs != null) { + ClusterNode crd = cctx.coordinators().coordinator(tx.topologyVersion()); + + assert crd != null; + + IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(crd, waitTxs); + + add(fut); + } + if ((tx.onePhaseCommit() && needFinishOnePhase(commit)) || (!tx.onePhaseCommit() && mappings != null)) { if (mappings.single()) { GridDistributedTxMapping mapping = mappings.singleMapping(); http://git-wip-us.apache.org/repos/asf/ignite/blob/c6f89481/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java index 7034aca..d19af59 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java @@ -388,7 +388,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager * @param nodeId Sender node ID. * @param msg Message. */ - private void processCoordinatorTxAckResponse(UUID nodeId, CoordinatorFutureResponse msg) { + private void processCoordinatorAckResponse(UUID nodeId, CoordinatorFutureResponse msg) { WaitAckFuture fut = ackFuts.remove(msg.futureId()); if (fut != null) @@ -706,7 +706,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager else if (msg instanceof CoordinatorTxAckRequest) processCoordinatorTxAckRequest(nodeId, (CoordinatorTxAckRequest)msg); else if (msg instanceof CoordinatorFutureResponse) - processCoordinatorTxAckResponse(nodeId, (CoordinatorFutureResponse)msg); + processCoordinatorAckResponse(nodeId, (CoordinatorFutureResponse)msg); else if (msg instanceof CoordinatorQueryAckRequest) processCoordinatorQueryAckRequest((CoordinatorQueryAckRequest)msg); else if (msg instanceof CoordinatorQueryVersionRequest) http://git-wip-us.apache.org/repos/asf/ignite/blob/c6f89481/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index f785e2b..1b386d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; +import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.GridTuple; @@ -148,6 +149,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig /** */ protected CacheWriteSynchronizationMode syncMode; + /** */ + private GridLongList mvccWaitTxs; + /** * Empty constructor required for {@link Externalizable}. */ @@ -208,6 +212,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig txState = implicitSingle ? new IgniteTxImplicitSingleStateImpl() : new IgniteTxStateImpl(); } + public GridLongList mvccWaitTransactions() { + return mvccWaitTxs; + } + /** * @return Transaction write synchronization mode. */ @@ -472,7 +480,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig /** {@inheritDoc} */ @SuppressWarnings({"CatchGenericClass"}) - @Override public void userCommit() throws IgniteCheckedException { + @Override public final void userCommit() throws IgniteCheckedException { TransactionState state = state(); if (state != COMMITTING) { @@ -689,9 +697,19 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig null, mvccVer); - if (updRes.success()) + if (updRes.success()) { txEntry.updateCounter(updRes.updatePartitionCounter()); + GridLongList waitTxs = updRes.mvccWaitTransactions(); + + if (waitTxs != null) { + if (this.mvccWaitTxs == null) + this.mvccWaitTxs = waitTxs; + else + this.mvccWaitTxs.addAll(waitTxs); + } + } + if (nearCached != null && updRes.success()) { nearCached.innerSet( null, http://git-wip-us.apache.org/repos/asf/ignite/blob/c6f89481/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 3afbb35..f5309e5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -463,8 +463,11 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr @Nullable Long updateCntr, MvccCoordinatorVersion mvccVer ) - throws IgniteCheckedException, GridCacheEntryRemovedException { - return new GridCacheUpdateTxResult(true, rawPut(val, ttl)); + throws IgniteCheckedException, GridCacheEntryRemovedException + { + rawPut(val, ttl); + + return new GridCacheUpdateTxResult(true); } /** {@inheritDoc} */ @@ -547,11 +550,9 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr ) throws IgniteCheckedException, GridCacheEntryRemovedException { obsoleteVer = ver; - CacheObject old = val; - val = null; - return new GridCacheUpdateTxResult(true, old); + return new GridCacheUpdateTxResult(true); } /** @inheritDoc */
