ignite-3478 Support for optimistic transactions
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3f33d6a5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3f33d6a5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3f33d6a5 Branch: refs/heads/ignite-3478 Commit: 3f33d6a5d2e5be3b52f32a6746129decd736f0af Parents: 4c06131 Author: sboikov <sboi...@gridgain.com> Authored: Mon Oct 16 14:50:32 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Oct 16 14:50:32 2017 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 34 +- .../processors/cache/GridCacheEntryInfo.java | 10 +- .../processors/cache/GridCacheMapEntry.java | 35 +- .../cache/GridCacheMvccEntryInfo.java | 3 + .../cache/GridCacheSharedContext.java | 3 + .../cache/IgniteCacheOffheapManager.java | 6 + .../cache/IgniteCacheOffheapManagerImpl.java | 94 +- .../GridDistributedTxRemoteAdapter.java | 8 +- .../distributed/dht/GridDhtCacheAdapter.java | 12 +- .../distributed/dht/GridDhtGetSingleFuture.java | 13 +- .../distributed/dht/GridDhtTxFinishFuture.java | 33 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 10 +- .../dht/GridPartitionedGetFuture.java | 34 +- .../dht/GridPartitionedSingleGetFuture.java | 26 +- .../dht/atomic/GridDhtAtomicCache.java | 6 +- .../dht/colocated/GridDhtColocatedCache.java | 30 +- .../GridDhtPartitionsExchangeFuture.java | 59 +- .../distributed/near/GridNearGetRequest.java | 8 +- ...arOptimisticSerializableTxPrepareFuture.java | 71 +- .../near/GridNearOptimisticTxPrepareFuture.java | 60 +- ...ridNearOptimisticTxPrepareFutureAdapter.java | 126 ++ .../GridNearPessimisticTxPrepareFuture.java | 18 +- .../near/GridNearSingleGetRequest.java | 41 +- .../near/GridNearTxFinishAndAckFuture.java | 16 +- .../near/GridNearTxFinishFuture.java | 33 +- .../cache/distributed/near/GridNearTxLocal.java | 101 +- .../near/GridNearTxPrepareRequest.java | 2 +- .../mvcc/CacheCoordinatorsDiscoveryData.java | 3 + .../cache/mvcc/CacheCoordinatorsProcessor.java | 203 ++- .../cache/mvcc/CoordinatorAckRequestQuery.java | 130 ++ .../cache/mvcc/CoordinatorAckRequestTx.java | 203 +++ .../mvcc/CoordinatorAckRequestTxAndQuery.java | 123 ++ .../mvcc/CoordinatorAckRequestTxAndQueryEx.java | 147 ++ .../mvcc/CoordinatorActiveQueriesMessage.java | 136 ++ .../cache/mvcc/CoordinatorFutureResponse.java | 3 + .../cache/mvcc/CoordinatorQueryAckRequest.java | 130 -- .../cache/mvcc/CoordinatorTxAckRequest.java | 194 -- .../cache/mvcc/CoordinatorWaitTxsRequest.java | 3 + .../cache/mvcc/MvccCoordinatorChangeAware.java | 31 + .../cache/mvcc/MvccCoordinatorFuture.java | 30 + .../cache/mvcc/MvccCoordinatorVersion.java | 5 + .../mvcc/MvccCoordinatorVersionResponse.java | 12 +- .../mvcc/MvccCoordinatorVersionWithoutTxs.java | 173 ++ .../processors/cache/mvcc/MvccCounter.java | 5 +- .../cache/mvcc/MvccEmptyLongList.java | 53 + .../processors/cache/mvcc/MvccQueryAware.java | 43 - .../processors/cache/mvcc/MvccQueryTracker.java | 64 +- .../cache/mvcc/PreviousCoordinatorQueries.java | 30 +- .../processors/cache/mvcc/TxMvccInfo.java | 27 +- .../persistence/GridCacheOffheapManager.java | 6 +- .../cache/transactions/IgniteTxAdapter.java | 10 + .../transactions/IgniteTxLocalAdapter.java | 10 +- .../cache/tree/AbstractDataInnerIO.java | 11 +- .../cache/tree/AbstractDataLeafIO.java | 2 +- .../processors/cache/tree/CacheDataTree.java | 2 +- .../processors/cache/tree/MvccRemoveRow.java | 3 +- .../processors/cache/tree/MvccUpdateRow.java | 35 +- .../datastreamer/DataStreamerImpl.java | 4 +- .../internal/TestRecordingCommunicationSpi.java | 10 +- .../cache/mvcc/CacheMvccTransactionsTest.java | 1701 ++++++++++++++---- .../testsuites/IgniteCacheMvccTestSuite.java | 42 + 61 files changed, 3536 insertions(+), 940 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 99bc8af..35e9af3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -103,13 +103,17 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFi import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest; -import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryAckRequest; -import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryVersionRequest; -import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxAckRequest; +import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestTxAndQueryEx; +import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorActiveQueriesMessage; import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorFutureResponse; +import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestQuery; +import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryVersionRequest; +import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestTx; +import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestTxAndQuery; import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxCounterRequest; import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorWaitTxsRequest; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionWithoutTxs; import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter; import org.apache.ignite.internal.processors.cache.mvcc.NewCoordinatorQueryAckRequest; import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo; @@ -892,7 +896,7 @@ public class GridIoMessageFactory implements MessageFactory { break; case 131: // TODO IGNITE-3478 fix constants. - msg = new CoordinatorTxAckRequest(); + msg = new CoordinatorAckRequestTx(); break; @@ -907,7 +911,7 @@ public class GridIoMessageFactory implements MessageFactory { break; case 134: - msg = new CoordinatorQueryAckRequest(); + msg = new CoordinatorAckRequestQuery(); break; @@ -937,10 +941,30 @@ public class GridIoMessageFactory implements MessageFactory { return msg; case 141: + msg = new CoordinatorAckRequestTxAndQuery(); + + return msg; + + case 142: + msg = new CoordinatorAckRequestTxAndQueryEx(); + + return msg; + + case 143: msg = new MvccCounter(); return msg; + case 144: + msg = new CoordinatorActiveQueriesMessage(); + + return msg; + + case 145: + msg = new MvccCoordinatorVersionWithoutTxs(); + + return msg; + // [-3..119] [124..128] [-23..-27] [-36..-55]- this // [120..123] - DR http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java index e09d33c..86a2253 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.mvcc.MvccEmptyLongList; import org.apache.ignite.internal.processors.cache.mvcc.MvccLongList; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -82,8 +83,13 @@ public class GridCacheEntryInfo implements Message, MvccCoordinatorVersion { } /** {@inheritDoc} */ - @Override public MvccLongList activeTransactions() { - return null; + @Override public final MvccLongList activeTransactions() { + return MvccEmptyLongList.INSTANCE; + } + + /** {@inheritDoc} */ + @Override public final MvccCoordinatorVersion withoutActiveTransactions() { + return this; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index a1535e9..391a56e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -559,7 +559,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme boolean retVer, boolean keepBinary, boolean reserveForLoad, - MvccCoordinatorVersion mvccVer, + @Nullable MvccCoordinatorVersion mvccVer, @Nullable ReaderArguments readerArgs ) throws IgniteCheckedException, GridCacheEntryRemovedException { assert !(retVer && readThrough); @@ -748,7 +748,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme long expTime = CU.toExpireTime(ttl); // Update indexes before actual write to entry. - storeValue(ret, expTime, nextVer, null); + if (cctx.mvccEnabled()) + cctx.offheap().mvccInitialValue(this, ret, nextVer, expTime, null); // TODO IGNITE-3478. + else + storeValue(ret, expTime, nextVer, null); update(ret, expTime, ttl, nextVer, true); @@ -1016,6 +1019,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme this, val, newVer, + expireTime, mvccVer); } else @@ -2091,7 +2095,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme ", val=" + val + ']'); } - removeValue(); + if (cctx.mvccEnabled()) + cctx.offheap().mvccRemoveAll(this); + else + removeValue(); } onMarkedObsolete(); @@ -2285,7 +2292,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme ver = newVer; flags &= ~IS_EVICT_DISABLED; - removeValue(); + if (cctx.mvccEnabled()) + cctx.offheap().mvccRemoveAll(this); + else + removeValue(); onInvalidate(); @@ -2520,7 +2530,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme long delta = expireTime - U.currentTimeMillis(); if (delta <= 0) { - removeValue(); + if (cctx.mvccEnabled()) + cctx.offheap().mvccRemoveAll(this); + else + removeValue(); return true; } @@ -2605,7 +2618,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); if (cctx.mvccEnabled()) { - cctx.offheap().mvccInitialValue(this, val, ver, mvccVer); + cctx.offheap().mvccInitialValue(this, val, ver, expTime, mvccVer); if (val != null) update(val, expTime, ttl, ver, true); @@ -2769,7 +2782,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); if (val != null) { - storeValue(val, expTime, newVer, null); + if (cctx.mvccEnabled()) // TODO IGNITE-3478 + cctx.offheap().mvccInitialValue(this, val, newVer, expTime, null); + else + storeValue(val, expTime, newVer, null); if (deletedUnlocked()) deletedUnlocked(false); @@ -3086,7 +3102,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (log.isTraceEnabled()) log.trace("onExpired clear [key=" + key + ", entry=" + System.identityHashCode(this) + ']'); - removeValue(); + if (cctx.mvccEnabled()) + cctx.offheap().mvccRemoveAll(this); + else + removeValue(); if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) { cctx.events().addEvent(partition(), http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccEntryInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccEntryInfo.java index c914f58..0e72d98 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccEntryInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccEntryInfo.java @@ -27,6 +27,9 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; */ public class GridCacheMvccEntryInfo extends GridCacheEntryInfo { /** */ + private static final long serialVersionUID = 0L; + + /** */ private long mvccCrdVer; /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index f4e4d48..41e5175 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -897,6 +897,9 @@ public class GridCacheSharedContext<K, V> { GridCacheContext<K, V> activeCacheCtx = cacheContext(cacheId); + if (cacheCtx.mvccEnabled() != activeCacheCtx.mvccEnabled()) + return "caches with different mvcc settings can't be enlisted in one transaction"; + if (cacheCtx.systemTx()) { if (activeCacheCtx.cacheId() != cacheCtx.cacheId()) return "system transaction can include only one cache"; http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 2c070fc..975d349 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -200,6 +200,7 @@ public interface IgniteCacheOffheapManager { GridCacheMapEntry entry, @Nullable CacheObject val, GridCacheVersion ver, + long expireTime, MvccCoordinatorVersion mvccVer ) throws IgniteCheckedException; @@ -208,6 +209,7 @@ public interface IgniteCacheOffheapManager { * @param entry Entry. * @param val Value. * @param ver Cache version. + * @param expireTime Expire time. * @param mvccVer Mvcc update version. * @return Transactions to wait for before finishing current transaction. * @throws IgniteCheckedException If failed. @@ -217,6 +219,7 @@ public interface IgniteCacheOffheapManager { GridCacheMapEntry entry, CacheObject val, GridCacheVersion ver, + long expireTime, MvccCoordinatorVersion mvccVer ) throws IgniteCheckedException; @@ -545,6 +548,7 @@ public interface IgniteCacheOffheapManager { KeyCacheObject key, @Nullable CacheObject val, GridCacheVersion ver, + long expireTime, MvccCoordinatorVersion mvccVer) throws IgniteCheckedException; /** @@ -553,6 +557,7 @@ public interface IgniteCacheOffheapManager { * @param key Key. * @param val Value. * @param ver Version. + * @param expireTime Expire time. * @param mvccVer Mvcc version. * @return List of transactions to wait for. * @throws IgniteCheckedException If failed. @@ -563,6 +568,7 @@ public interface IgniteCacheOffheapManager { KeyCacheObject key, CacheObject val, GridCacheVersion ver, + long expireTime, MvccCoordinatorVersion mvccVer) throws IgniteCheckedException; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 2bff203..c07e2a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -41,6 +41,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvali import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionWithoutTxs; import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter; import org.apache.ignite.internal.processors.cache.mvcc.MvccLongList; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; @@ -387,12 +389,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager GridCacheMapEntry entry, CacheObject val, GridCacheVersion ver, + long expireTime, MvccCoordinatorVersion mvccVer) throws IgniteCheckedException { return dataStore(entry.localPartition()).mvccInitialValue( entry.context(), entry.key(), val, ver, + expireTime, mvccVer); } @@ -402,12 +406,17 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager GridCacheMapEntry entry, CacheObject val, GridCacheVersion ver, + long expireTime, MvccCoordinatorVersion mvccVer) throws IgniteCheckedException { + if (entry.detached() || entry.isNear()) + return null; + return dataStore(entry.localPartition()).mvccUpdate(entry.context(), primary, entry.key(), val, ver, + expireTime, mvccVer); } @@ -417,6 +426,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager GridCacheMapEntry entry, MvccCoordinatorVersion mvccVer ) throws IgniteCheckedException { + if (entry.detached() || entry.isNear()) + return null; + return dataStore(entry.localPartition()).mvccRemove(entry.context(), primary, entry.key(), @@ -425,6 +437,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** {@inheritDoc} */ @Override public void mvccRemoveAll(GridCacheMapEntry entry) throws IgniteCheckedException { + if (entry.detached() || entry.isNear()) + return; + dataStore(entry.localPartition()).mvccRemoveAll(entry.context(), entry.key()); } @@ -749,7 +764,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager curPart = ds.partId(); // TODO IGNITE-3478, mvcc with cache groups. - if (mvccVer != null) + if (grp.mvccEnabled()) cur = ds.mvccCursor(mvccVer); else cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId); @@ -1383,17 +1398,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager KeyCacheObject key, @Nullable CacheObject val, GridCacheVersion ver, + long expireTime, MvccCoordinatorVersion mvccVer) throws IgniteCheckedException { - assert mvccVer != null; - if (!busyLock.enterBusy()) throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); try { - assert val != null || CacheCoordinatorsProcessor.versionForRemovedValue(mvccVer.coordinatorVersion()); - int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; CacheObjectContext coCtx = cctx.cacheObjectContext(); @@ -1403,6 +1415,17 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager MvccUpdateRow updateRow; + boolean newVal = false; + + // TODO IGNITE-3478: null is passed for loaded from store, need handle better. + if (mvccVer == null) { + mvccVer = new MvccCoordinatorVersionWithoutTxs(1L, CacheCoordinatorsProcessor.START_VER, 0L); + + newVal = true; + } + else + assert val != null || CacheCoordinatorsProcessor.versionForRemovedValue(mvccVer.coordinatorVersion()); + if (val != null) { val.valueBytes(coCtx); @@ -1410,7 +1433,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager key, val, ver, + expireTime, mvccVer, + false, partId, cacheId); } @@ -1418,6 +1443,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager updateRow = new MvccRemoveRow( key, mvccVer, + false, partId, cacheId); } @@ -1427,6 +1453,25 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager rowStore.addRow(updateRow); + if (newVal) { + GridCursor<CacheDataRow> cur = dataTree.find( + new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE), + new MvccSearchRow(cacheId, key, 1L, 1L), + CacheDataRowAdapter.RowData.KEY_ONLY); + + while (cur.next()) { + CacheDataRow row = cur.get(); + + assert row.link() != 0; + + boolean rmvd = dataTree.removex(row); + + assert rmvd; + + rowStore.removeRow(row.link()); + } + } + boolean old = dataTree.putx(updateRow); assert !old; @@ -1448,8 +1493,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager KeyCacheObject key, CacheObject val, GridCacheVersion ver, + long expireTime, MvccCoordinatorVersion mvccVer) throws IgniteCheckedException { assert mvccVer != null; + assert primary || mvccVer.activeTransactions().size() == 0 : mvccVer; if (!busyLock.enterBusy()) throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); @@ -1463,11 +1510,15 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager key.valueBytes(coCtx); val.valueBytes(coCtx); + boolean needOld = hasPendingEntries || cctx.isQueryEnabled(); + MvccUpdateRow updateRow = new MvccUpdateRow( key, val, ver, + expireTime, mvccVer, + needOld, partId, cacheId); @@ -1507,6 +1558,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager KeyCacheObject key, MvccCoordinatorVersion mvccVer) throws IgniteCheckedException { assert mvccVer != null; + assert primary || mvccVer.activeTransactions().size() == 0 : mvccVer; if (!busyLock.enterBusy()) throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); @@ -1519,9 +1571,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager // Make sure value bytes initialized. key.valueBytes(coCtx); + boolean needOld = hasPendingEntries || cctx.isQueryEnabled(); + MvccRemoveRow updateRow = new MvccRemoveRow( key, mvccVer, + needOld, partId, cacheId); @@ -1573,16 +1628,25 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager new MvccSearchRow(cacheId, key, 1, 1), CacheDataRowAdapter.RowData.KEY_ONLY); + boolean first = true; + while (cur.next()) { CacheDataRow row = cur.get(); - assert row.link() != 0; + assert row.link() != 0 : row; boolean rmvd = dataTree.removex(row); assert rmvd; rowStore.removeRow(row.link()); + + if (first) { + if (!versionForRemovedValue(row.mvccCoordinatorVersion())) + decrementSize(cctx.cacheId()); + + first = false; + } } } @@ -1937,18 +2001,20 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager long rowCrdVerMasked = row.mvccCoordinatorVersion(); - long rowCrdVer = unmaskCoordinatorVersion(rowCrdVerMasked); + if (ver != null) { + long rowCrdVer = unmaskCoordinatorVersion(rowCrdVerMasked); - if (rowCrdVer > ver.coordinatorVersion()) - continue; + if (rowCrdVer > ver.coordinatorVersion()) + continue; - if (rowCrdVer == ver.coordinatorVersion() && row.mvccCounter() > ver.counter()) - continue; + if (rowCrdVer == ver.coordinatorVersion() && row.mvccCounter() > ver.counter()) + continue; - MvccLongList txs = ver.activeTransactions(); + MvccLongList txs = ver.activeTransactions(); - if (txs != null && rowCrdVer == ver.coordinatorVersion() && txs.contains(row.mvccCounter())) - continue; + if (txs != null && rowCrdVer == ver.coordinatorVersion() && txs.contains(row.mvccCounter())) + continue; + } if (curKey != null && row.key().equals(curKey)) continue; http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 77039cc..839f3d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -474,7 +474,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter cctx.database().checkpointReadLock(); try { - assert !txState.mvccEnabled(cctx) || mvccInfo != null; + assert !txState.mvccEnabled(cctx) || mvccInfo != null : "Mvcc is not initialized: " + this; Collection<IgniteTxEntry> entries = near() ? allEntries() : writeEntries(); @@ -597,7 +597,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter resolveTaskName(), dhtVer, txEntry.updateCounter(), - mvccInfo != null ? mvccInfo.version() : null); + mvccVersionForUpdate()); else { assert val != null : txEntry; @@ -622,7 +622,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter resolveTaskName(), dhtVer, txEntry.updateCounter(), - mvccInfo != null ? mvccInfo.version() : null); + mvccVersionForUpdate()); // Keep near entry up to date. if (nearCached != null) { @@ -655,7 +655,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter resolveTaskName(), dhtVer, txEntry.updateCounter(), - mvccInfo != null ? mvccInfo.version() : null); + mvccVersionForUpdate()); // Keep near entry up to date. if (nearCached != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 5dbb3a8..e1c5379 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -848,9 +848,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param taskNameHash Task name hash. * @param expiry Expiry. * @param skipVals Skip vals flag. + * @param mvccVer Mvcc version. * @return Future for the operation. */ - public GridDhtGetSingleFuture getDhtSingleAsync( + GridDhtGetSingleFuture getDhtSingleAsync( UUID nodeId, long msgId, KeyCacheObject key, @@ -861,7 +862,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap int taskNameHash, @Nullable IgniteCacheExpiryPolicy expiry, boolean skipVals, - boolean recovery + boolean recovery, + MvccCoordinatorVersion mvccVer ) { GridDhtGetSingleFuture fut = new GridDhtGetSingleFuture<>( ctx, @@ -875,7 +877,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap taskNameHash, expiry, skipVals, - recovery); + recovery, + mvccVer); fut.init(); @@ -903,7 +906,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap req.taskNameHash(), expiryPlc, req.skipValues(), - req.recovery()); + req.recovery(), + req.mvccVersion()); fut.listen(new CI1<IgniteInternalFuture<GridCacheEntryInfo>>() { @Override public void apply(IgniteInternalFuture<GridCacheEntryInfo> f) { http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java index 9fb4b0a..7462406 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.ReaderArguments; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; @@ -103,6 +104,9 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa /** Recovery context flag. */ private final boolean recovery; + /** */ + private final MvccCoordinatorVersion mvccVer; + /** * @param cctx Context. * @param msgId Message ID. @@ -115,6 +119,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa * @param taskNameHash Task name hash code. * @param expiryPlc Expiry policy. * @param skipVals Skip values flag. + * @param mvccVer Mvcc version. */ public GridDhtGetSingleFuture( GridCacheContext<K, V> cctx, @@ -128,7 +133,8 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa int taskNameHash, @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean skipVals, - boolean recovery + boolean recovery, + @Nullable MvccCoordinatorVersion mvccVer ) { assert reader != null; assert key != null; @@ -145,6 +151,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa this.expiryPlc = expiryPlc; this.skipVals = skipVals; this.recovery = recovery; + this.mvccVer = mvccVer; futId = IgniteUuid.randomUuid(); @@ -366,7 +373,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa expiryPlc, skipVals, recovery, - null); // TODO IGNITE-3478 + mvccVer); } else { final ReaderArguments args = readerArgs; @@ -392,7 +399,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa expiryPlc, skipVals, recovery, - null); // TODO IGNITE-3478 + mvccVer); fut0.listen(createGetFutureListener()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index d624e2c..5e8428d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFutu import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorFuture; import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; @@ -300,7 +301,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity assert mvccInfo != null; - IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(mvccInfo.coordinator(), waitTxs); + IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(mvccInfo.coordinatorNodeId(), waitTxs); add(fut); @@ -412,7 +413,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity if (tx.onePhaseCommit()) return false; - assert !commit || !tx.txState().mvccEnabled(cctx) || tx.mvccInfo() != null; + assert !commit || !tx.txState().mvccEnabled(cctx) || tx.mvccInfo() != null || F.isEmpty(tx.writeEntries()); boolean sync = tx.syncMode() == FULL_SYNC; @@ -423,6 +424,12 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity int miniId = 0; + // Do not need process active transactions on backups. + TxMvccInfo mvccInfo = tx.mvccInfo(); + + if (mvccInfo != null) + mvccInfo = mvccInfo.withoutActiveTransactions(); + // Create mini futures. for (GridDistributedTxMapping dhtMapping : dhtMap.values()) { ClusterNode n = dhtMapping.primary(); @@ -470,7 +477,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity updCntrs, false, false, - tx.mvccInfo()); + mvccInfo); req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion()); @@ -540,7 +547,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity tx.activeCachesDeploymentEnabled(), false, false, - tx.mvccInfo()); + mvccInfo); req.writeVersion(tx.writeVersion()); @@ -610,13 +617,23 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity /** {@inheritDoc} */ @Override public String toString() { - // TODO IGNITE-3478 (mvcc wait txs fut) Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { @SuppressWarnings("unchecked") @Override public String apply(IgniteInternalFuture<?> f) { - return "[node=" + ((MiniFuture)f).node().id() + - ", loc=" + ((MiniFuture)f).node().isLocal() + - ", done=" + f.isDone() + "]"; + if (f.getClass() == MiniFuture.class) { + return "[node=" + ((MiniFuture)f).node().id() + + ", loc=" + ((MiniFuture)f).node().isLocal() + + ", done=" + f.isDone() + "]"; + } + else if (f instanceof MvccCoordinatorFuture) { + MvccCoordinatorFuture crdFut = (MvccCoordinatorFuture)f; + + return "[mvccCrdNode=" + crdFut.coordinatorNodeId() + + ", loc=" + crdFut.coordinatorNodeId().equals(cctx.localNodeId()) + + ", done=" + f.isDone() + "]"; + } + else + return f.toString(); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 3143c4f..623dea8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -1344,6 +1344,12 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite final long timeout = timeoutObj != null ? timeoutObj.timeout : 0; + // Do not need process active transactions on backups. + TxMvccInfo mvccInfo = tx.mvccInfo(); + + if (mvccInfo != null) + mvccInfo = mvccInfo.withoutActiveTransactions(); + // Create mini futures. for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) { assert !dhtMapping.empty(); @@ -1387,7 +1393,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite tx.activeCachesDeploymentEnabled(), tx.storeWriteThrough(), retVal, - tx.mvccInfo()); + mvccInfo); int idx = 0; @@ -1501,7 +1507,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite tx.activeCachesDeploymentEnabled(), tx.storeWriteThrough(), retVal, - tx.mvccInfo()); + mvccInfo); for (IgniteTxEntry entry : nearMapping.entries()) { if (CU.writes().apply(entry)) { http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 7993d05..e424a18 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -45,7 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetR import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; -import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryAware; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorChangeAware; import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -61,13 +61,15 @@ import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; /** * Colocated get future. */ -public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V> implements MvccQueryAware { +public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V> + implements MvccCoordinatorChangeAware, IgniteBiInClosure<AffinityTopologyVersion, IgniteCheckedException> { /** */ private static final long serialVersionUID = 0L; @@ -78,6 +80,9 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda private static IgniteLogger log; /** */ + protected final MvccCoordinatorVersion mvccVer; + + /** */ private MvccQueryTracker mvccTracker; /** @@ -94,6 +99,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda * @param skipVals Skip values flag. * @param needVer If {@code true} returns values as tuples containing value and version. * @param keepCacheObjects Keep cache objects flag. + * @param mvccVer Mvcc version. */ public GridPartitionedGetFuture( GridCacheContext<K, V> cctx, @@ -107,7 +113,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean skipVals, boolean needVer, - boolean keepCacheObjects + boolean keepCacheObjects, + @Nullable MvccCoordinatorVersion mvccVer ) { super(cctx, keys, @@ -121,6 +128,9 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda needVer, keepCacheObjects, recovery); + assert mvccVer == null || cctx.mvccEnabled(); + + this.mvccVer = mvccVer; if (log == null) log = U.logger(cctx.kernalContext(), logRef, GridPartitionedGetFuture.class); @@ -133,6 +143,9 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda if (!cctx.mvccEnabled()) return null; + if (mvccVer != null) + return mvccVer; + MvccCoordinatorVersion ver = mvccTracker.mvccVersion(); assert ver != null : "[fut=" + this + ", mvccTracker=" + mvccTracker + "]"; @@ -158,7 +171,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion(); } - if (cctx.mvccEnabled()) { + if (cctx.mvccEnabled() && mvccVer == null) { mvccTracker = new MvccQueryTracker(cctx, canRemap, this); trackable = true; @@ -174,13 +187,14 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda } /** {@inheritDoc} */ - @Override public void onMvccVersionReceived(AffinityTopologyVersion topVer) { - initialMap(topVer); - } + @Override public void apply(AffinityTopologyVersion topVer, IgniteCheckedException e) { + if (e != null) + onDone(e); + else { + assert topVer != null; - /** {@inheritDoc} */ - @Override public void onMvccVersionError(IgniteCheckedException e) { - onDone(e); + initialMap(topVer); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index b34687f..c31b8b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -41,11 +41,12 @@ import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; @@ -122,6 +123,9 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec @GridToStringInclude private ClusterNode node; + /** */ + protected final MvccCoordinatorVersion mvccVer; + /** * @param cctx Context. * @param key Key. @@ -149,9 +153,11 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec boolean skipVals, boolean needVer, boolean keepCacheObjects, - boolean recovery + boolean recovery, + @Nullable MvccCoordinatorVersion mvccVer ) { assert key != null; + assert mvccVer == null || cctx.mvccEnabled(); AffinityTopologyVersion lockedTopVer = cctx.shared().lockedTopologyVersion(null); @@ -176,6 +182,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec this.keepCacheObjects = keepCacheObjects; this.recovery = recovery; this.topVer = topVer; + this.mvccVer = mvccVer; futId = IgniteUuid.randomUuid(); @@ -230,7 +237,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec taskName == null ? 0 : taskName.hashCode(), expiryPlc, skipVals, - recovery); + recovery, + mvccVer); final Collection<Integer> invalidParts = fut.invalidPartitions(); @@ -288,7 +296,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec /*add reader*/false, needVer, cctx.deploymentEnabled(), - recovery); + recovery, + mvccVer); try { cctx.io().send(node, req, cctx.ioPolicy()); @@ -355,7 +364,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec boolean skipEntry = readNoEntry; if (readNoEntry) { - CacheDataRow row = cctx.offheap().read(cctx, key); // TODO IGNITE-3478 + CacheDataRow row = mvccVer != null ? cctx.offheap().mvccRead(cctx, key, mvccVer) : + cctx.offheap().read(cctx, key); if (row != null) { long expireTime = row.expireTime(); @@ -398,8 +408,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec taskName, expiryPlc, true, - null, - null); // TODO IGNITE-3478 + mvccVer, + null); if (res != null) { v = res.value(); @@ -418,7 +428,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec taskName, expiryPlc, true, - null); // TODO IGNITE-3478 + mvccVer); } colocated.context().evicts().touch(entry, topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 16416cc..d6862fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1385,7 +1385,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { skipVals, needVer, false, - recovery); + recovery, + null); fut.init(); @@ -1591,7 +1592,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { expiry, skipVals, needVer, - false); + false, + null); fut.init(topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 7364cb3..c975edb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; @@ -241,7 +242,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte skipVals, needVer, /*keepCacheObjects*/false, - opCtx != null && opCtx.recovery()); + opCtx != null && opCtx.recovery(), + null); fut.init(); @@ -319,7 +321,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param needVer Need version. * @return Loaded values. */ - public IgniteInternalFuture<Map<K, V>> loadAsync( + private IgniteInternalFuture<Map<K, V>> loadAsync( @Nullable Collection<KeyCacheObject> keys, boolean readThrough, boolean forcePrimary, @@ -341,7 +343,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte expiryPlc, skipVals, needVer, - false); + false, + null); } /** @@ -370,7 +373,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte boolean skipVals, boolean needVer, boolean keepCacheObj, - boolean recovery + boolean recovery, + @Nullable MvccCoordinatorVersion mvccVer ) { GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx, ctx.toCacheKeyObject(key), @@ -384,7 +388,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte skipVals, needVer, keepCacheObj, - recovery); + recovery, + mvccVer); fut.init(); @@ -403,6 +408,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param skipVals Skip values flag. * @param needVer If {@code true} returns values as tuples containing value and version. * @param keepCacheObj Keep cache objects flag. + * @param mvccVer Mvcc version. * @return Load future. */ public final IgniteInternalFuture<Map<K, V>> loadAsync( @@ -417,8 +423,11 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean skipVals, boolean needVer, - boolean keepCacheObj + boolean keepCacheObj, + @Nullable MvccCoordinatorVersion mvccVer ) { + assert mvccVer == null || ctx.mvccEnabled(); + if (keys == null || keys.isEmpty()) return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); @@ -426,7 +435,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte expiryPlc = expiryPolicy(null); // Optimisation: try to resolve value locally and escape 'get future' creation. - if (!forcePrimary && ctx.affinityNode() && !ctx.mvccEnabled()) { + if (!forcePrimary && ctx.affinityNode() && (!ctx.mvccEnabled() || mvccVer != null)) { try { Map<K, V> locVals = null; @@ -499,7 +508,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte taskName, expiryPlc, !deserializeBinary, - null, + mvccVer, null); if (getRes != null) { @@ -519,7 +528,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte taskName, expiryPlc, !deserializeBinary, - null); + mvccVer); } // Entry was not in memory or in swap, so we remove it from cache. @@ -602,7 +611,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte expiryPlc, skipVals, needVer, - keepCacheObj); + keepCacheObj, + mvccVer); fut.init(topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 88095ab..c14d6e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -79,8 +79,9 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopolo import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter; -import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryAware; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorChangeAware; import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; @@ -657,7 +658,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - updateTopologies(crdNode, cctx.coordinators().currentCoordinator()); + updateTopologies(crd, crdNode, cctx.coordinators().currentCoordinator()); switch (exchange) { case ALL: { @@ -760,11 +761,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * @param exchCrd Exchange coordinator node. * @param crd Coordinator flag. * @param mvccCrd Mvcc coordinator. * @throws IgniteCheckedException If failed. */ - private void updateTopologies(boolean crd, MvccCoordinator mvccCrd) throws IgniteCheckedException { + private void updateTopologies(ClusterNode exchCrd, boolean crd, MvccCoordinator mvccCrd) throws IgniteCheckedException { for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal()) continue; @@ -808,24 +810,41 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte Map<MvccCounter, Integer> activeQrys = new HashMap<>(); - for (GridCacheFuture<?> fut : cctx.mvcc().activeFutures()) { - if (fut instanceof MvccQueryAware) { - MvccCoordinatorVersion ver = ((MvccQueryAware)fut).onMvccCoordinatorChange(mvccCrd); + for (GridCacheFuture<?> fut : cctx.mvcc().activeFutures()) + processMvccCoordinatorChange(mvccCrd, fut, activeQrys); - if (ver != null ) { - MvccCounter cntr = new MvccCounter(ver.coordinatorVersion(), ver.counter()); + for (IgniteInternalTx tx : cctx.tm().activeTransactions()) + processMvccCoordinatorChange(mvccCrd, tx, activeQrys); - Integer cnt = activeQrys.get(cntr); + exchCtx.addActiveQueries(cctx.localNodeId(), activeQrys); - if (cnt == null) - activeQrys.put(cntr, 1); - else - activeQrys.put(cntr, cnt + 1); - } - } - } + if (exchCrd == null || !mvccCrd.nodeId().equals(exchCrd.id())) + cctx.coordinators().sendActiveQueries(mvccCrd.nodeId(), activeQrys); + } + } - exchCtx.addActiveQueries(cctx.localNodeId(), activeQrys); + /** + * @param mvccCrd New coordinator. + * @param nodeObj Node object. + * @param activeQrys Active queries map to update. + */ + private void processMvccCoordinatorChange(MvccCoordinator mvccCrd, + Object nodeObj, + Map<MvccCounter, Integer> activeQrys) + { + if (nodeObj instanceof MvccCoordinatorChangeAware) { + MvccCoordinatorVersion ver = ((MvccCoordinatorChangeAware)nodeObj).onMvccCoordinatorChange(mvccCrd); + + if (ver != null ) { + MvccCounter cntr = new MvccCounter(ver.coordinatorVersion(), ver.counter()); + + Integer cnt = activeQrys.get(cntr); + + if (cnt == null) + activeQrys.put(cntr, 1); + else + activeQrys.put(cntr, cnt + 1); + } } } @@ -1288,9 +1307,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte msg.partitionHistoryCounters(partHistReserved0); } - Map<UUID, Map<MvccCounter, Integer>> activeQueries = exchCtx.activeQueries(); + if (exchCtx.newMvccCoordinator() && cctx.coordinators().currentCoordinatorId().equals(node.id())) { + Map<UUID, Map<MvccCounter, Integer>> activeQueries = exchCtx.activeQueries(); - msg.activeQueries(activeQueries != null ? activeQueries.get(cctx.localNodeId()) : null); + msg.activeQueries(activeQueries != null ? activeQueries.get(cctx.localNodeId()) : null); + } if (stateChangeExchange() && changeGlobalStateE != null) msg.setError(changeGlobalStateE); http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java index c6f3280..ab927d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java @@ -46,6 +46,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemTy import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; /** * Get request. Responsible for obtaining entry from primary node. 'Near' means 'Primary' here, not 'Near Cache'. @@ -132,6 +133,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD * @param createTtl New TTL to set after entry is created, -1 to leave unchanged. * @param accessTtl New TTL to set after entry is accessed, -1 to leave unchanged. * @param addDepInfo Deployment info. + * @param mvccVer Mvcc version. */ public GridNearGetRequest( int cacheId, @@ -149,7 +151,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD boolean skipVals, boolean addDepInfo, boolean recovery, - MvccCoordinatorVersion mvccVer + @Nullable MvccCoordinatorVersion mvccVer ) { assert futId != null; assert miniId != null; @@ -194,9 +196,9 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD } /** - * @return Counter. + * @return Mvcc version. */ - public MvccCoordinatorVersion mvccVersion() { + @Nullable public MvccCoordinatorVersion mvccVersion() { return mvccVer; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 9d36bca..b606f0e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -38,6 +38,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -52,7 +54,6 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -69,10 +70,6 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING; public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter { /** */ @GridToStringExclude - private KeyLockFuture keyLockFut; - - /** */ - @GridToStringExclude private ClientRemapFuture remapFut; /** */ @@ -189,10 +186,20 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim tx.removeMapping(m.primary().id()); } + prepareError(e); + } + + /** + * @param e Error. + */ + private void prepareError(Throwable e) { ERR_UPD.compareAndSet(this, null, e); if (keyLockFut != null) keyLockFut.onDone(e); + + if (mvccVerFut != null) + mvccVerFut.onDone(); } /** {@inheritDoc} */ @@ -345,11 +352,25 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim boolean hasNearCache = false; + MvccCoordinator mvccCrd = null; + for (IgniteTxEntry write : writes) { map(write, topVer, mappings, txMapping, remap, topLocked); - if (write.context().isNear()) + GridCacheContext cctx = write.context(); + + if (cctx.isNear()) hasNearCache = true; + + if (cctx.mvccEnabled() && mvccCrd == null) { + mvccCrd = cctx.affinity().mvccCoordinator(topVer); + + if (mvccCrd == null) { + onDone(CacheCoordinatorsProcessor.noCoordinatorError(topVer)); + + return; + } + } } for (IgniteTxEntry read : reads) @@ -365,6 +386,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim return; } + assert !tx.txState().mvccEnabled(cctx) || mvccCrd != null || F.isEmpty(writes); + tx.addEntryMapping(mappings.values()); cctx.mvcc().recheckPendingLocks(); @@ -376,12 +399,16 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim MiniFuture locNearEntriesFut = null; + int lockCnt = keyLockFut != null ? 1 : 0; + // Create futures in advance to have all futures when process {@link GridNearTxPrepareResponse#clientRemapVersion}. for (GridDistributedTxMapping m : mappings.values()) { assert !m.empty(); MiniFuture fut = new MiniFuture(this, m, ++miniId); + lockCnt++; + add((IgniteInternalFuture)fut); if (m.primary().isLocal() && m.hasNearCacheEntries() && m.hasColocatedCacheEntries()) { @@ -390,9 +417,14 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim locNearEntriesFut = fut; add((IgniteInternalFuture)new MiniFuture(this, m, ++miniId)); + + lockCnt++; } } + if (mvccCrd != null) + initMvccVersionFuture(mvccCrd, lockCnt, remap); + Collection<IgniteInternalFuture<?>> futs = (Collection)futures(); Iterator<IgniteInternalFuture<?>> it = futs.iterator(); @@ -703,20 +735,20 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { @Override public String apply(IgniteInternalFuture<?> f) { - return "[node=" + ((MiniFuture)f).primary().id() + - ", loc=" + ((MiniFuture)f).primary().isLocal() + - ", done=" + f.isDone() + "]"; - } - }, - new P1<IgniteInternalFuture<?>>() { - @Override public boolean apply(IgniteInternalFuture<?> f) { - return isMini(f); + if (isMini(f)) { + return "[node=" + ((MiniFuture)f).primary().id() + + ", loc=" + ((MiniFuture)f).primary().isLocal() + + ", done=" + f.isDone() + + ", err=" + f.error() + "]"; + } + else + return f.toString(); } }); return S.toString(GridNearOptimisticSerializableTxPrepareFuture.class, this, "innerFuts", futs, - "keyLockFut", keyLockFut, + "remap", remapFut != null, "tx", tx, "super", super.toString()); } @@ -924,7 +956,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim remap(res); } catch (IgniteCheckedException e) { - ERR_UPD.compareAndSet(parent, null, e); + parent.prepareError(e); onDone(e); } @@ -937,7 +969,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim err0.retryReadyFuture(affFut); - ERR_UPD.compareAndSet(parent, null, err0); + parent.prepareError(err0); onDone(err0); } @@ -948,7 +980,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim parent); } - ERR_UPD.compareAndSet(parent, null, e); + parent.prepareError(e); onDone(e); } @@ -963,6 +995,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim // Finish this mini future (need result only on client node). onDone(parent.cctx.kernalContext().clientNode() ? res : null); + + if (parent.mvccVerFut != null) + parent.mvccVerFut.onLockReceived(); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index ef3075e..1fba1dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -41,6 +41,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; @@ -49,7 +51,6 @@ import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; @@ -73,10 +74,6 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING; */ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter { /** */ - @GridToStringExclude - private KeyLockFuture keyLockFut; - - /** */ private int miniId; /** */ @@ -382,6 +379,18 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa return; } + if (write.context().mvccEnabled()) { + MvccCoordinator mvccCrd = write.context().affinity().mvccCoordinator(topVer); + + if (mvccCrd == null) { + onDone(CacheCoordinatorsProcessor.noCoordinatorError(topVer)); + + return; + } + + initMvccVersionFuture(mvccCrd, keyLockFut != null ? 2 : 1, remap); + } + if (keyLockFut != null) keyLockFut.onAllKeysAdded(); @@ -426,14 +435,27 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa boolean hasNearCache = false; + MvccCoordinator mvccCrd = null; + for (IgniteTxEntry write : writes) { write.clearEntryReadVersion(); GridDistributedTxMapping updated = map(write, topVer, cur, topLocked, remap); - if(updated == null) - // an exception occurred while transaction mapping, stop further processing + if (updated == null) { + // An exception occurred while transaction mapping, stop further processing. break; + } + + if (write.context().mvccEnabled() && mvccCrd == null) { + mvccCrd = write.context().affinity().mvccCoordinator(topVer); + + if (mvccCrd == null) { + onDone(CacheCoordinatorsProcessor.noCoordinatorError(topVer)); + + break; + } + } if (write.context().isNear()) hasNearCache = true; @@ -474,6 +496,11 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa return; } + assert !tx.txState().mvccEnabled(cctx) || mvccCrd != null; + + if (mvccCrd != null) + initMvccVersionFuture(mvccCrd, keyLockFut != null ? 2 : 1, remap); + if (keyLockFut != null) keyLockFut.onAllKeysAdded(); @@ -497,8 +524,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa private void proceedPrepare(final Queue<GridDistributedTxMapping> mappings) { final GridDistributedTxMapping m = mappings.poll(); - if (m == null) + if (m == null) { + if (mvccVerFut != null) + mvccVerFut.onLockReceived(); + return; + } proceedPrepare(m, mappings); } @@ -786,9 +817,13 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa @Override public String toString() { Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { @Override public String apply(IgniteInternalFuture<?> f) { - return "[node=" + ((MiniFuture)f).node().id() + - ", loc=" + ((MiniFuture)f).node().isLocal() + - ", done=" + f.isDone() + "]"; + if (isMini(f)) { + return "[node=" + ((MiniFuture)f).node().id() + + ", loc=" + ((MiniFuture)f).node().isLocal() + + ", done=" + f.isDone() + "]"; + } + else + return f.toString(); } }, new P1<IgniteInternalFuture<Object>>() { @Override public boolean apply(IgniteInternalFuture<Object> fut) { @@ -798,7 +833,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa return S.toString(GridNearOptimisticTxPrepareFuture.class, this, "innerFuts", futs, - "keyLockFut", keyLockFut, "tx", tx, "super", super.toString()); } @@ -954,6 +988,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa // Proceed prepare before finishing mini future. if (mappings != null) parent.proceedPrepare(mappings); + else if (parent.mvccVerFut != null) + parent.mvccVerFut.onLockReceived(); // Finish this mini future. onDone((GridNearTxPrepareResponse)null);