pending
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7301ada3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7301ada3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7301ada3 Branch: refs/heads/ignite-10537 Commit: 7301ada36bb64ad6c2050687ed14d1e740fbb677 Parents: 24a50e1 Author: Igor Seliverstov <[email protected]> Authored: Fri Dec 7 12:21:01 2018 +0300 Committer: Igor Seliverstov <[email protected]> Committed: Fri Dec 7 12:21:01 2018 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 10 +- .../processors/cache/GridCacheAdapter.java | 10 +- .../processors/cache/GridCacheEntryEx.java | 13 +- .../processors/cache/GridCacheMapEntry.java | 101 +++-------- .../cache/GridCacheUpdateTxResult.java | 26 --- .../GridDistributedTxRemoteAdapter.java | 9 +- .../dht/GridDhtTransactionalCacheAdapter.java | 3 +- .../distributed/dht/GridDhtTxFinishFuture.java | 18 -- .../distributed/dht/GridDhtTxPrepareFuture.java | 6 +- .../dht/GridPartitionedGetFuture.java | 6 +- .../dht/GridPartitionedSingleGetFuture.java | 6 +- .../dht/atomic/GridDhtAtomicCache.java | 13 +- .../dht/colocated/GridDhtColocatedCache.java | 8 +- .../near/AckCoordinatorOnRollback.java | 6 +- .../distributed/near/GridNearGetFuture.java | 12 +- .../near/GridNearTxFinishAndAckFuture.java | 7 +- .../near/GridNearTxFinishFuture.java | 24 +-- .../cache/distributed/near/GridNearTxLocal.java | 50 +----- .../local/atomic/GridLocalAtomicCache.java | 13 +- .../processors/cache/mvcc/MvccProcessor.java | 22 --- .../cache/mvcc/MvccProcessorImpl.java | 180 +------------------ .../processors/cache/mvcc/MvccQueryTracker.java | 9 - .../cache/mvcc/MvccQueryTrackerImpl.java | 78 ++++---- .../processors/cache/mvcc/MvccUtils.java | 2 +- .../cache/mvcc/StaticMvccQueryTracker.java | 5 - .../cache/mvcc/msg/MvccWaitTxsRequest.java | 159 ---------------- .../cache/transactions/IgniteTxAdapter.java | 7 +- .../cache/transactions/IgniteTxHandler.java | 3 +- .../transactions/IgniteTxLocalAdapter.java | 51 +----- .../processors/cache/GridCacheTestEntryEx.java | 13 +- .../cache/mvcc/CacheMvccAbstractTest.java | 3 +- 31 files changed, 135 insertions(+), 738 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 3f4eb18..7d8f314 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -34,6 +34,8 @@ import org.apache.ignite.internal.managers.checkpoint.GridCheckpointRequest; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.managers.deployment.GridDeploymentRequest; import org.apache.ignite.internal.managers.deployment.GridDeploymentResponse; +import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyRequest; +import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.authentication.UserAuthenticateRequestMessage; @@ -46,8 +48,6 @@ import org.apache.ignite.internal.processors.cache.CacheEvictionEntry; import org.apache.ignite.internal.processors.cache.CacheInvokeDirectResult; import org.apache.ignite.internal.processors.cache.CacheObjectByteArrayImpl; import org.apache.ignite.internal.processors.cache.CacheObjectImpl; -import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyRequest; -import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheReturn; @@ -137,7 +137,6 @@ import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccQuerySnapshotReq import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccTxSnapshotRequest; -import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccWaitTxsRequest; import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastRequest; import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastResponse; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest; @@ -989,11 +988,6 @@ public class GridIoMessageFactory implements MessageFactory { break; - case 142: - msg = new MvccWaitTxsRequest(); - - break; - case 143: msg = new GridCacheMvccEntryInfo(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 465f1f1..1cd94d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -87,9 +87,9 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl; import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; @@ -2005,7 +2005,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V final boolean storeEnabled = !skipVals && readThrough && ctx.readThrough(); - boolean readNoEntry = ctx.readNoEntry(expiry, readerArgs != null); + boolean readNoEntry = ctx.mvccEnabled() || ctx.readNoEntry(expiry, readerArgs != null); for (KeyCacheObject key : keys) { while (true) { @@ -2086,7 +2086,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V taskName, expiry, !deserializeBinary, - mvccSnapshot, readerArgs); assert res != null; @@ -2111,7 +2110,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V taskName, expiry, !deserializeBinary, - mvccSnapshot, readerArgs); if (res == null) @@ -4996,8 +4994,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /*transformClo*/null, /*taskName*/null, /*expiryPlc*/null, - !deserializeBinary, - null); // TODO IGNITE-7371 + !deserializeBinary + ); if (val == null) return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 319c134..62360d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -272,7 +272,7 @@ public interface GridCacheEntryEx { * @throws IgniteCheckedException If loading value failed. * @throws GridCacheEntryRemovedException If entry was removed. */ - @Nullable public CacheObject innerGet(@Nullable GridCacheVersion ver, + public CacheObject innerGet(@Nullable GridCacheVersion ver, @Nullable IgniteInternalTx tx, boolean readThrough, boolean updateMetrics, @@ -281,8 +281,7 @@ public interface GridCacheEntryEx { Object transformClo, String taskName, @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean keepBinary, - @Nullable MvccSnapshot mvccVer) + boolean keepBinary) throws IgniteCheckedException, GridCacheEntryRemovedException; /** @@ -310,7 +309,6 @@ public interface GridCacheEntryEx { String taskName, @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean keepBinary, - @Nullable MvccSnapshot mvccVer, @Nullable ReaderArguments readerArgs) throws IgniteCheckedException, GridCacheEntryRemovedException; @@ -332,7 +330,6 @@ public interface GridCacheEntryEx { String taskName, @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean keepBinary, - @Nullable MvccSnapshot mvccVer, @Nullable ReaderArguments readerArgs) throws IgniteCheckedException, GridCacheEntryRemovedException; /** @@ -467,8 +464,7 @@ public interface GridCacheEntryEx { @Nullable UUID subjId, String taskName, @Nullable GridCacheVersion dhtVer, - @Nullable Long updateCntr, - @Nullable MvccSnapshot mvccVer + @Nullable Long updateCntr ) throws IgniteCheckedException, GridCacheEntryRemovedException; /** @@ -510,8 +506,7 @@ public interface GridCacheEntryEx { @Nullable UUID subjId, String taskName, @Nullable GridCacheVersion dhtVer, - @Nullable Long updateCntr, - MvccSnapshot mvccVer + @Nullable Long updateCntr ) throws IgniteCheckedException, GridCacheEntryRemovedException; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index bbdff35..da3f61a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -80,7 +80,6 @@ import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFilter; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure; -import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.IgniteTree; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridClosureException; @@ -622,7 +621,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** {@inheritDoc} */ - @Nullable @Override public final CacheObject innerGet( + @Override public final CacheObject innerGet( @Nullable GridCacheVersion ver, @Nullable IgniteInternalTx tx, boolean readThrough, @@ -632,8 +631,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme Object transformClo, String taskName, @Nullable IgniteCacheExpiryPolicy expirePlc, - boolean keepBinary, - MvccSnapshot mvccVer) + boolean keepBinary) throws IgniteCheckedException, GridCacheEntryRemovedException { return (CacheObject)innerGet0( ver, @@ -648,7 +646,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme false, keepBinary, false, - mvccVer, null); } @@ -659,7 +656,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme String taskName, @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean keepBinary, - MvccSnapshot mvccVer, @Nullable ReaderArguments readerArgs) throws IgniteCheckedException, GridCacheEntryRemovedException { return (EntryGetResult)innerGet0( /*ver*/null, @@ -674,7 +670,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme true, keepBinary, /*reserve*/true, - mvccVer, readerArgs); } @@ -689,7 +684,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme String taskName, @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean keepBinary, - MvccSnapshot mvccVer, @Nullable ReaderArguments readerArgs) throws IgniteCheckedException, GridCacheEntryRemovedException { return (EntryGetResult)innerGet0( @@ -705,7 +699,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme true, keepBinary, false, - mvccVer, readerArgs); } @@ -724,7 +717,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme boolean retVer, boolean keepBinary, boolean reserveForLoad, - @Nullable MvccSnapshot mvccVer, @Nullable ReaderArguments readerArgs ) throws IgniteCheckedException, GridCacheEntryRemovedException { assert !(retVer && readThrough); @@ -750,51 +742,39 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CacheObject val; - if (mvccVer != null) { - CacheDataRow row = cctx.offheap().mvccRead(cctx, key, mvccVer); - - if (row != null) { - val = row.value(); - resVer = row.version(); - } - else - val = null; - } - else { - boolean valid = valid(tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion()); + boolean valid = valid(tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion()); - if (valid) { - val = this.val; + if (valid) { + val = this.val; - if (val == null) { - if (isStartVersion()) { - unswap(null, false); + if (val == null) { + if (isStartVersion()) { + unswap(null, false); - val = this.val; - } + val = this.val; } + } - if (val != null) { - long expireTime = expireTimeExtras(); + if (val != null) { + long expireTime = expireTimeExtras(); - if (expireTime > 0 && (expireTime < U.currentTimeMillis())) { - if (onExpired((CacheObject)cctx.unwrapTemporary(val), null)) { - val = null; - evt = false; + if (expireTime > 0 && (expireTime < U.currentTimeMillis())) { + if (onExpired((CacheObject)cctx.unwrapTemporary(val), null)) { + val = null; + evt = false; - if (cctx.deferredDelete()) { - deferred = true; - ver0 = ver; - } - else - obsolete = true; + if (cctx.deferredDelete()) { + deferred = true; + ver0 = ver; } + else + obsolete = true; } } } - else - val = null; } + else + val = null; CacheObject ret = val; @@ -1460,8 +1440,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable UUID subjId, String taskName, @Nullable GridCacheVersion dhtVer, - @Nullable Long updateCntr, - @Nullable MvccSnapshot mvccVer + @Nullable Long updateCntr ) throws IgniteCheckedException, GridCacheEntryRemovedException { CacheObject old; @@ -1483,8 +1462,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme ensureFreeSpace(); - GridLongList mvccWaitTxs = null; - lockListenerReadLock(); lockEntry(); @@ -1565,18 +1542,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme assert val != null; - if (cctx.mvccEnabled()) { - assert mvccVer != null; - - mvccWaitTxs = cctx.offheap().mvccUpdateNative(tx.local(), - this, - val, - newVer, - expireTime, - mvccVer); - } - else - storeValue(val, expireTime, newVer); + storeValue(val, expireTime, newVer); if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached()) deletedUnlocked(false); @@ -1655,7 +1621,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (intercept) cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, val, val0, keepBinary, updateCntr0)); - return valid ? new GridCacheUpdateTxResult(true, updateCntr0, logPtr, mvccWaitTxs) : + return valid ? new GridCacheUpdateTxResult(true, updateCntr0, logPtr) : new GridCacheUpdateTxResult(false, logPtr); } @@ -1685,8 +1651,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable UUID subjId, String taskName, @Nullable GridCacheVersion dhtVer, - @Nullable Long updateCntr, - @Nullable MvccSnapshot mvccVer + @Nullable Long updateCntr ) throws IgniteCheckedException, GridCacheEntryRemovedException { assert cctx.transactional(); @@ -1716,8 +1681,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme boolean marked = false; - GridLongList mvccWaitTxs = null; - lockListenerReadLock(); lockEntry(); @@ -1762,13 +1725,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } - if (cctx.mvccEnabled()) { - assert mvccVer != null; - - mvccWaitTxs = cctx.offheap().mvccRemoveNative(tx.local(), this, mvccVer); - } - else - removeValue(); + removeValue(); update(null, 0, 0, newVer, true); @@ -1889,7 +1846,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme cctx.config().getInterceptor().onAfterRemove(entry0); if (valid) - return new GridCacheUpdateTxResult(true, updateCntr0, logPtr, mvccWaitTxs); + return new GridCacheUpdateTxResult(true, updateCntr0, logPtr); else return new GridCacheUpdateTxResult(false, logPtr); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java index 8a68100..7df27d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java @@ -21,7 +21,6 @@ import java.util.List; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwareSearchRow; -import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; @@ -37,9 +36,6 @@ public class GridCacheUpdateTxResult { private long updateCntr; /** */ - private GridLongList mvccWaitTxs; - - /** */ private GridFutureAdapter<GridCacheUpdateTxResult> fut; /** */ @@ -108,21 +104,6 @@ public class GridCacheUpdateTxResult { } /** - * Constructor. - * - * @param success Success flag. - * @param updateCntr Update counter. - * @param logPtr Logger WAL pointer for the update. - * @param mvccWaitTxs List of transactions to wait for completion. - */ - GridCacheUpdateTxResult(boolean success, long updateCntr, WALPointer logPtr, GridLongList mvccWaitTxs) { - this.success = success; - this.updateCntr = updateCntr; - this.logPtr = logPtr; - this.mvccWaitTxs = mvccWaitTxs; - } - - /** * @return Partition update counter. */ public long updateCounter() { @@ -151,13 +132,6 @@ public class GridCacheUpdateTxResult { } /** - * @return List of transactions to wait for completion. - */ - @Nullable public GridLongList mvccWaitTransactions() { - return mvccWaitTxs; - } - - /** * @return Mvcc history rows. */ @Nullable public List<MvccLinkAwareSearchRow> mvccHistory() { http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index a915478..94ab814 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -628,8 +628,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter CU.subjectId(this, cctx), resolveTaskName(), dhtVer, - txEntry.updateCounter(), - mvccSnapshot()); + txEntry.updateCounter()); else { assert val != null : txEntry; @@ -653,8 +652,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter CU.subjectId(this, cctx), resolveTaskName(), dhtVer, - txEntry.updateCounter(), - mvccSnapshot()); + txEntry.updateCounter()); txEntry.updateCounter(updRes.updateCounter()); @@ -691,8 +689,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter CU.subjectId(this, cctx), resolveTaskName(), dhtVer, - txEntry.updateCounter(), - mvccSnapshot()); + txEntry.updateCounter()); txEntry.updateCounter(updRes.updateCounter()); http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index dcd7be5..cb66e74 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -1437,8 +1437,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach null, tx != null ? tx.resolveTaskName() : null, null, - req.keepBinary(), - null); // TODO IGNITE-7371 + req.keepBinary()); } assert e.lockedBy(mappedVer) || ctx.mvcc().isRemoved(e.context(), mappedVer) : http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index a9c133a..a5cbbdf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -35,13 +35,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFutu import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; -import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cache.mvcc.MvccFuture; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -299,22 +297,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity // No backup or near nodes to send commit message to (just complete then). sync = false; - GridLongList waitTxs = tx.mvccWaitTransactions(); - - if (waitTxs != null) { - MvccSnapshot snapshot = tx.mvccSnapshot(); - - assert snapshot != null; - - MvccCoordinator crd = cctx.coordinators().currentCoordinator(); - - if (crd != null && crd.coordinatorVersion() == snapshot.coordinatorVersion()) { - add((IgniteInternalFuture)cctx.coordinators().waitTxsFuture(crd.nodeId(), waitTxs)); - - sync = true; - } - } - markInitialized(); if (!sync) http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 58edc9d..1933efa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -410,8 +410,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite entryProc, tx.resolveTaskName(), null, - keepBinary, - null); // TODO IGNITE-7371 + keepBinary); if (retVal || txEntry.op() == TRANSFORM) { if (!F.isEmpty(txEntry.entryProcessors())) { @@ -524,8 +523,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite /*transformClo*/null, /*taskName*/null, /*expiryPlc*/null, - /*keepBinary*/true, - null); // TODO IGNITE-7371 + /*keepBinary*/true); } if (oldVal != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 0629754..d38903d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -470,7 +470,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda GridDhtCacheAdapter<K, V> cache = cache(); - boolean readNoEntry = cctx.readNoEntry(expiryPlc, false); + boolean readNoEntry = cctx.mvccEnabled() || cctx.readNoEntry(expiryPlc, false); boolean evt = !skipVals; while (true) { @@ -528,7 +528,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda taskName, expiryPlc, !deserializeBinary, - mvccSnapshot(), null); if (getRes != null) { @@ -547,8 +546,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda null, taskName, expiryPlc, - !deserializeBinary, - mvccSnapshot()); + !deserializeBinary); } entry.touch(topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index 0d69485..ab7e911 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -429,7 +429,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec GridDhtCacheAdapter colocated = cctx.dht(); - boolean readNoEntry = cctx.readNoEntry(expiryPlc, false); + boolean readNoEntry = cctx.mvccEnabled() || cctx.readNoEntry(expiryPlc, false); boolean evt = !skipVals; while (true) { @@ -486,7 +486,6 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec taskName, expiryPlc, true, - mvccSnapshot, null); if (res != null) { @@ -505,8 +504,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec null, taskName, expiryPlc, - true, - mvccSnapshot); + true); } entry.touch(topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 6118fbb..516b9cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1534,7 +1534,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { taskName, expiry, true, - null, null); if (getRes != null) { @@ -1553,8 +1552,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, taskName, expiry, - !deserializeBinary, - null); + !deserializeBinary); } // Entry was not in memory or in swap, so we remove it from cache. @@ -2125,8 +2123,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { entryProcessor, taskName, null, - req.keepBinary(), - null); + req.keepBinary()); Object oldVal = null; Object updatedVal = null; @@ -2306,8 +2303,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, taskName, null, - req.keepBinary(), - null); + req.keepBinary()); Object val = ctx.config().getInterceptor().onBeforePut( new CacheLazyEntry( @@ -2352,8 +2348,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, taskName, null, - req.keepBinary(), - null); + req.keepBinary()); IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor() .onBeforeRemove(new CacheLazyEntry(ctx, entry.key(), old, req.keepBinary())); http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 8fc353c..7546144 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -48,11 +48,11 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUn import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtEmbeddedFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFinishedFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; @@ -524,7 +524,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte Map<K, V> locVals = null; boolean success = true; - boolean readNoEntry = ctx.readNoEntry(expiryPlc, false); + boolean readNoEntry = ctx.mvccEnabled() || ctx.readNoEntry(expiryPlc, false); boolean evt = !skipVals; for (KeyCacheObject key : keys) { @@ -595,7 +595,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte taskName, expiryPlc, !deserializeBinary, - mvccSnapshot, null); if (getRes != null) { @@ -614,8 +613,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte null, taskName, expiryPlc, - !deserializeBinary, - mvccSnapshot); + !deserializeBinary); } // Entry was not in memory or in swap, so we remove it from cache. http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/AckCoordinatorOnRollback.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/AckCoordinatorOnRollback.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/AckCoordinatorOnRollback.java index 1648da9..07ee5d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/AckCoordinatorOnRollback.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/AckCoordinatorOnRollback.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.util.typedef.CIX1; @@ -43,12 +42,9 @@ public class AckCoordinatorOnRollback extends CIX1<IgniteInternalFuture<IgniteIn @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) throws IgniteCheckedException { assert fut.isDone(); - MvccQueryTracker tracker = tx.mvccQueryTracker(); MvccSnapshot mvccSnapshot = tx.mvccSnapshot(); - if (tracker != null) // Optimistic tx. - tracker.onDone(tx, false); - else if (mvccSnapshot != null)// Pessimistic tx. + if (mvccSnapshot != null)// Pessimistic tx. tx.context().coordinators().ackTxRollback(mvccSnapshot); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 09445f7..adacc81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -361,8 +361,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap taskName, expiryPlc, !deserializeBinary, - null, - null); // TODO IGNITE-7371 + null); if (res != null) { v = res.value(); @@ -380,8 +379,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap null, taskName, expiryPlc, - !deserializeBinary, - null); // TODO IGNITE-7371 + !deserializeBinary); } } @@ -502,8 +500,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap taskName, expiryPlc, !deserializeBinary, - null, - null); // TODO IGNITE-7371 + null); if (res != null) { v = res.value(); @@ -521,8 +518,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap null, taskName, expiryPlc, - !deserializeBinary, - null); // TODO IGNITE-7371 + !deserializeBinary); } // Entry was not in memory or in swap, so we remove it from cache. http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java index a3a5cdb..22e9aee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -62,13 +61,9 @@ public class GridNearTxFinishAndAckFuture extends GridFutureAdapter<IgniteIntern IgniteInternalFuture<Void> ackFut = null; - MvccQueryTracker tracker = tx.mvccQueryTracker(); - MvccSnapshot mvccSnapshot = tx.mvccSnapshot(); - if (tracker != null) - ackFut = tracker.onDone(tx, commit); - else if (mvccSnapshot != null) { + if (mvccSnapshot != null) { if (commit) ackFut = tx.context().coordinators().ackTxCommit(mvccSnapshot); else http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 20d9e50..8484503 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -40,13 +40,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse; -import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cache.mvcc.MvccFuture; -import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; -import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; @@ -442,23 +439,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit private void doFinish(boolean commit, boolean clearThreadMap) { try { if (tx.localFinish(commit, clearThreadMap) || (!commit && tx.state() == UNKNOWN)) { - GridLongList waitTxs = tx.mvccWaitTransactions(); - - if (waitTxs != null) { - MvccSnapshot snapshot = tx.mvccSnapshot(); - - MvccCoordinator crd = cctx.coordinators().currentCoordinator(); - - assert snapshot != null; - - if (snapshot.coordinatorVersion() == crd.coordinatorVersion()) { - IgniteInternalFuture fut = cctx.coordinators() - .waitTxsFuture(cctx.coordinators().currentCoordinatorId(), waitTxs); - - add(fut); - } - } - // Cleanup transaction if heuristic failure. if (tx.state() == UNKNOWN) cctx.tm().rollbackTx(tx, clearThreadMap, false); @@ -468,13 +448,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit GridDistributedTxMapping mapping = mappings.singleMapping(); if (mapping != null) { - assert !hasFutures() || isDone() || waitTxs != null : futures(); + assert !hasFutures() || isDone() : futures(); finish(1, mapping, commit, !clearThreadMap); } } else { - assert !hasFutures() || isDone() || waitTxs != null : futures(); + assert !hasFutures() || isDone() : futures(); finish(mappings.mappings(), commit, !clearThreadMap); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 9f1f86d..01c4970 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -62,8 +62,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep import org.apache.ignite.internal.processors.cache.distributed.dht.GridInvokeValue; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; -import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; -import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; @@ -190,9 +188,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou /** Tx label. */ @Nullable private String lb; - /** */ - private MvccQueryTracker mvccTracker; - /** Whether this is Mvcc transaction or not.<p> * {@code null} means there haven't been any calls made on this transaction, and first operation will give this * field actual value. @@ -266,13 +261,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou trackTimeout = timeout() > 0 && !implicit() && cctx.time().addTimeoutObject(this); } - /** - * @return Mvcc query version tracker. - */ - public MvccQueryTracker mvccQueryTracker() { - return mvccTracker; - } - /** {@inheritDoc} */ @Override public boolean near() { return true; @@ -1396,7 +1384,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou resolveTaskName(), null, keepBinary, - null, // TODO IGNITE-7371 null) : null; if (res != null) { @@ -1415,8 +1402,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou entryProcessor, resolveTaskName(), null, - keepBinary, - null); // TODO IGNITE-7371 + keepBinary); } } catch (ClusterTopologyCheckedException e) { @@ -1969,17 +1955,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } /** - * @param cctx Cache context. - * @return Mvcc snapshot for read inside tx (initialized once for OPTIMISTIC SERIALIZABLE and REPEATABLE_READ txs). - */ - private MvccSnapshot mvccReadSnapshot(GridCacheContext cctx) { - if (!cctx.mvccEnabled() || mvccTracker == null) - return null; - - return mvccTracker.snapshot(); - } - - /** * @param cacheCtx Cache context. * @param cacheIds Involved cache ids. * @param parts Partitions. @@ -2292,7 +2267,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou resolveTaskName(), null, txEntry.keepBinary(), - null, // TODO IGNITE-7371 null); if (getRes != null) { @@ -2311,8 +2285,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou transformClo, resolveTaskName(), null, - txEntry.keepBinary(), - null); // TODO IGNITE-7371 + txEntry.keepBinary()); } // If value is in cache and passed the filter. @@ -2592,8 +2565,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou resolveTaskName(), null, txEntry.keepBinary(), - null, - null); // TODO IGNITE-7371 + null); if (getRes != null) { val = getRes.value(); @@ -2611,8 +2583,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou transformClo, resolveTaskName(), null, - txEntry.keepBinary(), - null); // TODO IGNITE-7371 + txEntry.keepBinary()); } if (val != null) { @@ -2680,7 +2651,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou resolveTaskName(), accessPlc, !deserializeBinary, - mvccReadSnapshot(cacheCtx), // TODO IGNITE-7371 null) : null; if (getRes != null) { @@ -2699,8 +2669,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou null, resolveTaskName(), accessPlc, - !deserializeBinary, - mvccReadSnapshot(cacheCtx)); // TODO IGNITE-7371 + !deserializeBinary); } if (val != null) { @@ -3039,7 +3008,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou needVer, /*keepCacheObject*/true, recovery, - mvccReadSnapshot(cacheCtx), + null, label() ).chain(new C1<IgniteInternalFuture<Object>, Void>() { @Override public Void apply(IgniteInternalFuture<Object> f) { @@ -3073,7 +3042,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou needVer, /*keepCacheObject*/true, label(), - mvccReadSnapshot(cacheCtx) + null ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() { @Override public Void apply(IgniteInternalFuture<Map<Object, Object>> f) { try { @@ -3170,8 +3139,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou resolveTaskName(), expiryPlc0, txEntry == null ? keepBinary : txEntry.keepBinary(), - null, - null); // TODO IGNITE-7371 + null); if (res == null) { if (misses == null) @@ -3883,7 +3851,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou NearTxFinishFuture fut = fast ? new GridNearTxFastFinishFuture(this, commit) : new GridNearTxFinishFuture<>(cctx, this, commit); - if (mvccQueryTracker() != null || mvccSnapshot != null || txState.mvccEnabled()) { + if (mvccSnapshot != null || txState.mvccEnabled()) { if (commit) fut = new GridNearTxFinishAndAckFuture(fut); else http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index 919f6d2..617a24d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -462,7 +462,6 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { taskName, expiry, !deserializeBinary, - null, null); if (res != null) { @@ -490,8 +489,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { null, taskName, expiry, - !deserializeBinary, - null); + !deserializeBinary); if (v != null) { ctx.addResult(vals, @@ -1099,8 +1097,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { entryProcessor, taskName, null, - keepBinary, - null); + keepBinary); Object oldVal = null; @@ -1242,8 +1239,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { null, taskName, null, - keepBinary, - null); + keepBinary); Object interceptorVal = ctx.config().getInterceptor().onBeforePut(new CacheLazyEntry( ctx, entry.key(), old, keepBinary), val); @@ -1278,8 +1274,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { null, taskName, null, - keepBinary, - null); + keepBinary); IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor() .onBeforeRemove(new CacheLazyEntry(ctx, entry.key(), old, keepBinary)); http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java index fd45c7a..161342f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java @@ -173,38 +173,16 @@ public interface MvccProcessor extends GridProcessor { /** * @param updateVer Transaction update version. - * @param readSnapshot Transaction read version. - * @param qryId Query tracker id. - * @return Acknowledge future. - */ - IgniteInternalFuture<Void> ackTxCommit(MvccVersion updateVer, MvccSnapshot readSnapshot, long qryId); - - /** - * @param updateVer Transaction update version. */ void ackTxRollback(MvccVersion updateVer); /** - * @param updateVer Transaction update version. - * @param readSnapshot Transaction read version. - * @param qryTrackerId Query tracker id. - */ - void ackTxRollback(MvccVersion updateVer, MvccSnapshot readSnapshot, long qryTrackerId); - - /** * @param snapshot Query version. * @param qryId Query tracker ID. */ void ackQueryDone(MvccSnapshot snapshot, long qryId); /** - * @param crdId Coordinator ID. - * @param txs Transaction IDs. - * @return Future. - */ - IgniteInternalFuture<Void> waitTxsFuture(UUID crdId, GridLongList txs); - - /** * @param log Logger. * @param diagCtx Diagnostic request. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java index 7f386d8..d026607 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java @@ -65,8 +65,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLo import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryCntr; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryId; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTx; -import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTxAndQueryCntr; -import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTxAndQueryId; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccActiveQueriesMessage; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccFutureResponse; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccMessage; @@ -74,7 +72,6 @@ import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccQuerySnapshotReq import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccTxSnapshotRequest; -import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccWaitTxsRequest; import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxKey; import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog; import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState; @@ -87,7 +84,6 @@ import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataRow; import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwareSearchRow; import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.GridLongList; -import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -134,7 +130,6 @@ import static org.apache.ignite.internal.processors.cache.persistence.CacheDataR /** * MVCC processor. */ -@SuppressWarnings("serial") public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, DatabaseLifecycleListener { /** */ private static final boolean FORCE_MVCC = @@ -206,9 +201,6 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce private final Map<Long, WaitAckFuture> ackFuts = new ConcurrentHashMap<>(); /** */ - private final Map<Long, GridFutureAdapter> waitTxFuts = new ConcurrentHashMap<>(); - - /** */ private final Map<TxKey, Waiter> waitMap = new ConcurrentHashMap<>(); /** */ @@ -271,6 +263,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce @Override public void preProcessCacheConfiguration(CacheConfiguration ccfg) { if (FORCE_MVCC && ccfg.getAtomicityMode() == TRANSACTIONAL && !CU.isSystemCache(ccfg.getName())) { ccfg.setAtomicityMode(TRANSACTIONAL_SNAPSHOT); + //noinspection unchecked ccfg.setNearConfiguration(null); } @@ -371,6 +364,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce private void txLogPageStoreInit(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException { assert CU.isPersistenceEnabled(ctx.config()); + //noinspection ConstantConditions ctx.cache().context().pageStore().initialize(TX_LOG_CACHE_ID, 1, TX_LOG_CACHE_NAME, mgr.dataRegion(TX_LOG_CACHE_NAME).memoryMetrics()); } @@ -650,9 +644,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce @Override public void addQueryTracker(MvccQueryTracker tracker) { assert tracker.id() != MVCC_TRACKER_ID_NA; - MvccQueryTracker tr = activeTrackers.put(tracker.id(), tracker); - - assert tr == null; + activeTrackers.putIfAbsent(tracker.id(), tracker); } /** {@inheritDoc} */ @@ -763,20 +755,12 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce /** {@inheritDoc} */ @Override public IgniteInternalFuture<Void> ackTxCommit(MvccSnapshot updateVer) { - return ackTxCommit(updateVer, null, 0L); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Void> ackTxCommit(MvccVersion updateVer, MvccSnapshot readSnapshot, - long qryId) { assert updateVer != null; MvccCoordinator crd = curCrd; if (updateVer.coordinatorVersion() == crd.coordinatorVersion()) - return sendTxCommit(crd, createTxAckMessage(futIdCntr.incrementAndGet(), updateVer, readSnapshot, qryId)); - else if (readSnapshot != null) - ackQueryDone(readSnapshot, qryId); + return sendTxCommit(crd, new MvccAckRequestTx(futIdCntr.incrementAndGet(), updateVer.counter())); return new GridFinishedFuture<>(); } @@ -790,36 +774,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce if (crd.coordinatorVersion() != updateVer.coordinatorVersion()) return; - MvccAckRequestTx msg = createTxAckMessage(-1, updateVer, null, 0L); - - msg.skipResponse(true); - - try { - sendMessage(crd.nodeId(), msg); - } - catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send tx rollback ack, node left [msg=" + msg + ", node=" + crd.nodeId() + ']'); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send tx rollback ack [msg=" + msg + ", node=" + crd.nodeId() + ']', e); - } - } - - /** {@inheritDoc} */ - @Override public void ackTxRollback(MvccVersion updateVer, MvccSnapshot readSnapshot, long qryTrackerId) { - assert updateVer != null; - - MvccCoordinator crd = curCrd; - - if (crd.coordinatorVersion() != updateVer.coordinatorVersion()) { - if (readSnapshot != null) - ackQueryDone(readSnapshot, qryTrackerId); - - return; - } - - MvccAckRequestTx msg = createTxAckMessage(-1, updateVer, readSnapshot, qryTrackerId); + MvccAckRequestTx msg = new MvccAckRequestTx((long)-1, updateVer.counter()); msg.skipResponse(true); @@ -837,11 +792,10 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce /** {@inheritDoc} */ @Override public void ackQueryDone(MvccSnapshot snapshot, long qryId) { - assert snapshot != null; - MvccCoordinator crd = currentCoordinator(); - if (crd == null || crd.coordinatorVersion() == snapshot.coordinatorVersion() + if (crd == null || snapshot != null + && crd.coordinatorVersion() == snapshot.coordinatorVersion() && sendQueryDone(crd, new MvccAckRequestQueryCntr(queryTrackCounter(snapshot)))) return; @@ -854,30 +808,6 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Void> waitTxsFuture(UUID crdId, GridLongList txs) { - assert crdId != null; - assert txs != null && !txs.isEmpty(); - - WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crdId, false); - - ackFuts.put(fut.id, fut); - - try { - sendMessage(crdId, new MvccWaitTxsRequest(fut.id, txs)); - } - catch (IgniteCheckedException e) { - if (ackFuts.remove(fut.id) != null) { - if (e instanceof ClusterTopologyCheckedException) - fut.onDone(); // No need to wait, new coordinator will be assigned, finish without error. - else - fut.onDone(e); - } - } - - return fut; - } - - /** {@inheritDoc} */ // TODO: Proper use of diagnostic context. @Override public void dumpDebugInfo(IgniteLogger log, @Nullable IgniteDiagnosticPrepareContext diagCtx) { boolean first = true; @@ -1077,19 +1007,12 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce private void onTxDone(Long txCntr, boolean increaseCommittedCntr) { assert initFut.isDone(); - GridFutureAdapter fut; - synchronized (this) { activeTxs.remove(txCntr); if (increaseCommittedCntr) committedCntr.setIfGreater(txCntr); } - - fut = waitTxFuts.remove(txCntr); - - if (fut != null) - fut.onDone(); } /** @@ -1100,23 +1023,6 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce } /** - * @param futId Future ID. - * @param updateVer Update version. - * @param readSnapshot Optional read version. - * @param qryTrackerId Query tracker id. - * @return Message. - */ - private MvccAckRequestTx createTxAckMessage(long futId, MvccVersion updateVer, MvccSnapshot readSnapshot, - long qryTrackerId) { - if (readSnapshot == null) - return new MvccAckRequestTx(futId, updateVer.counter()); - else if (readSnapshot.coordinatorVersion() == updateVer.coordinatorVersion()) - return new MvccAckRequestTxAndQueryCntr(futId, updateVer.counter(), queryTrackCounter(readSnapshot)); - else - return new MvccAckRequestTxAndQueryId(futId, updateVer.counter(), qryTrackerId); - } - - /** * @param mvccVer Read version. * @return Tracker counter. */ @@ -1384,23 +1290,6 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce return new NodeStoppingException("Operation has been cancelled (node is stopping)."); } - /** - * @param nodeId Node ID. - * @param msg Message. - */ - private void sendFutureResponse(UUID nodeId, MvccWaitTxsRequest msg) { - try { - sendMessage(nodeId, new MvccFutureResponse(msg.futureId())); - } - catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send tx ack response, node left [msg=" + msg + ", node=" + nodeId + ']'); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send tx ack response [msg=" + msg + ", node=" + nodeId + ']', e); - } - } - /** */ @NotNull private IgniteInternalFuture<Void> sendTxCommit(MvccCoordinator crd, MvccAckRequestTx msg) { WaitAckFuture fut = new WaitAckFuture(msg.futureId(), crd.nodeId(), true); @@ -1613,59 +1502,6 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce * @param nodeId Node ID. * @param msg Message. */ - @SuppressWarnings("unchecked") - private void processCoordinatorWaitTxsRequest(final UUID nodeId, final MvccWaitTxsRequest msg) { - GridLongList txs = msg.transactions(); - - GridCompoundFuture resFut = null; - - for (int i = 0; i < txs.size(); i++) { - Long txId = txs.get(i); - - GridFutureAdapter fut = waitTxFuts.get(txId); - - if (fut == null) { - GridFutureAdapter old = waitTxFuts.putIfAbsent(txId, fut = new GridFutureAdapter()); - - if (old != null) - fut = old; - } - - boolean isDone; - - synchronized (this) { - isDone = !activeTxs.containsKey(txId); - } - - if (isDone) - fut.onDone(); - - if (!fut.isDone()) { - if (resFut == null) - resFut = new GridCompoundFuture(); - - resFut.add(fut); - } - } - - if (resFut != null) - resFut.markInitialized(); - - if (resFut == null || resFut.isDone()) - sendFutureResponse(nodeId, msg); - else { - resFut.listen(new IgniteInClosure<IgniteInternalFuture>() { - @Override public void apply(IgniteInternalFuture fut) { - sendFutureResponse(nodeId, msg); - } - }); - } - } - - /** - * @param nodeId Node ID. - * @param msg Message. - */ private void processCoordinatorActiveQueriesMessage(UUID nodeId, MvccActiveQueriesMessage msg) { prevCrdQueries.addNodeActiveQueries(nodeId, msg.activeQueries()); } @@ -1856,8 +1692,6 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce processCoordinatorQuerySnapshotRequest(nodeId, (MvccQuerySnapshotRequest)msg); else if (msg instanceof MvccSnapshotResponse) processCoordinatorSnapshotResponse(nodeId, (MvccSnapshotResponse)msg); - else if (msg instanceof MvccWaitTxsRequest) - processCoordinatorWaitTxsRequest(nodeId, (MvccWaitTxsRequest)msg); else if (msg instanceof MvccAckRequestQueryId) processNewCoordinatorQueryAckRequest(nodeId, (MvccAckRequestQueryId)msg); else if (msg instanceof MvccActiveQueriesMessage) http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java index f143a43..d2e3b3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java @@ -84,15 +84,6 @@ public interface MvccQueryTracker { public void onDone(); /** - * Marks tracker as done. - * - * @param tx Transaction. - * @param commit Commit flag. - * @return Acknowledge future. - */ - @Nullable public IgniteInternalFuture<Void> onDone(@NotNull GridNearTxLocal tx, boolean commit); - - /** * Mvcc coordinator change callback. * * @param newCrd New mvcc coordinator. http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java index d86f5ec..d93a2e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java @@ -24,7 +24,6 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -139,40 +138,8 @@ public class MvccQueryTrackerImpl implements MvccQueryTracker { /** {@inheritDoc} */ @Override public void onDone() { - if (!checkDone()) - return; - - MvccProcessor prc = cctx.shared().coordinators(); - - MvccSnapshot snapshot = snapshot(); - - if (snapshot != null) { - prc.removeQueryTracker(id); - - prc.ackQueryDone(snapshot, id); - } - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Void> onDone(@NotNull GridNearTxLocal tx, boolean commit) { - MvccSnapshot snapshot = snapshot(), txSnapshot = tx.mvccSnapshot(); - - if (!checkDone() || snapshot == null && txSnapshot == null) - return commit ? new GridFinishedFuture<>() : null; - - MvccProcessor prc = cctx.shared().coordinators(); - - if (snapshot != null) - prc.removeQueryTracker(id); - - if (txSnapshot == null) - prc.ackQueryDone(snapshot, id); - else if (commit) - return prc.ackTxCommit(txSnapshot, snapshot, id); - else - prc.ackTxRollback(txSnapshot, snapshot, id); - - return null; + if (checkDone()) + ackQueryDone(snapshot()); } /** {@inheritDoc} */ @@ -195,8 +162,21 @@ public class MvccQueryTrackerImpl implements MvccQueryTracker { } /** */ + private void ackQueryDone(MvccSnapshot snapshot) { + MvccProcessor prc = cctx.shared().coordinators(); + + if (snapshot != null) { + prc.removeQueryTracker(id); + + prc.ackQueryDone(snapshot, id); + } + } + + /** */ private void requestSnapshot0(AffinityTopologyVersion topVer, MvccSnapshotResponseListener lsnr) { if (checkTopology(topVer, lsnr = decorate(lsnr))) { + cctx.shared().coordinators().addQueryTracker(this); + try { MvccSnapshot snapshot = cctx.shared().coordinators().tryRequestSnapshotLocal(); @@ -209,6 +189,8 @@ public class MvccQueryTrackerImpl implements MvccQueryTracker { lsnr.onError(e); } } + else + cctx.shared().coordinators().removeQueryTracker(id); } /** */ @@ -238,6 +220,9 @@ public class MvccQueryTrackerImpl implements MvccQueryTracker { this.topVer = topVer; synchronized (this) { + if (done) + return false; + crdVer = crd.coordinatorVersion(); } @@ -280,27 +265,30 @@ public class MvccQueryTrackerImpl implements MvccQueryTracker { * @return {@code false} if need to remap. */ private boolean onResponse0(@NotNull MvccSnapshot res, MvccSnapshotResponseListener lsnr) { - boolean needRemap = false; + boolean ackQueryDone = false, needRemap = false; synchronized (this) { assert snapshot() == null : "[this=" + this + ", rcvdVer=" + res + "]"; - if (crdVer != 0) { + if (!done && crdVer != 0) { this.snapshot = res; + + return true; } - else - needRemap = true; - } - if (needRemap) { // Coordinator failed or reassigned, need remap. - tryRemap(lsnr); - return false; + if (crdVer != 0) + ackQueryDone = true; + else if (!done) + needRemap = true; } - cctx.shared().coordinators().addQueryTracker(this); + if (needRemap) + tryRemap(lsnr); // Coordinator is failed or reassigned, need remap. + else if (ackQueryDone) + ackQueryDone(res); // Coordinator is not failed, but the tracker is already closed. - return true; + return false; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7301ada3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java index 0ceed09..43f87e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java @@ -809,7 +809,7 @@ public class MvccUtils { if (tx == null) tracker = new MvccQueryTrackerImpl(cctx); - else if ((tracker = tx.mvccQueryTracker()) == null) + else tracker = new StaticMvccQueryTracker(cctx, requestSnapshot(cctx, tx)); if (tracker.snapshot() == null)
