Repository: ignite Updated Branches: refs/heads/ignite-1607-read a273299bb -> 54bbc753d
ignite-1607 WIP Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/54bbc753 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/54bbc753 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/54bbc753 Branch: refs/heads/ignite-1607-read Commit: 54bbc753d6e8c89626f8205ed13ef4d2012fa323 Parents: a273299 Author: sboikov <[email protected]> Authored: Tue Oct 6 17:41:23 2015 +0300 Committer: sboikov <[email protected]> Committed: Tue Oct 6 17:41:23 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheEntryEx.java | 26 +- .../processors/cache/GridCacheMapEntry.java | 47 ++- .../processors/cache/GridCacheMvcc.java | 6 - .../distributed/dht/GridDhtCacheEntry.java | 9 +- .../dht/GridPartitionedGetFuture.java | 88 +++-- .../dht/atomic/GridDhtAtomicCache.java | 4 +- .../dht/colocated/GridDhtColocatedCache.java | 120 ++++++- .../distributed/near/GridNearCacheAdapter.java | 4 +- .../distributed/near/GridNearGetFuture.java | 124 +++++-- .../near/GridNearTransactionalCache.java | 9 +- .../cache/distributed/near/GridNearTxLocal.java | 56 +-- .../cache/transactions/IgniteTxAdapter.java | 3 + .../cache/transactions/IgniteTxEntry.java | 16 +- .../transactions/IgniteTxLocalAdapter.java | 345 +++++++++++-------- .../cache/transactions/IgniteTxLocalEx.java | 4 +- .../cache/transactions/IgniteTxManager.java | 20 +- .../CacheSerializableTransactionsTest.java | 88 ++++- .../processors/cache/GridCacheTestEntryEx.java | 15 + 18 files changed, 702 insertions(+), 282 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/54bbc753/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index fb4fcdd..bc36d2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -33,6 +33,8 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.util.lang.GridTuple3; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.lang.IgniteBiTuple; import org.jetbrains.annotations.Nullable; /** @@ -286,7 +288,6 @@ public interface GridCacheEntryEx { * @param subjId Subject ID initiated this read. * @param transformClo Transform closure to record event. * @param taskName Task name. - * together with getting the value is an atomic operation. * @param expiryPlc Expiry policy. * @return Cached value. * @throws IgniteCheckedException If loading value failed. @@ -308,6 +309,29 @@ public interface GridCacheEntryEx { throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException; /** + * @param readSwap Flag indicating whether to check swap memory. + * @param unmarshal Unmarshal flag. + * @param updateMetrics If {@code true} then metrics should be updated. + * @param evt Flag to signal event notification. + * @param subjId Subject ID initiated this read. + * @param transformClo Transform closure to record event. + * @param taskName Task name. + * @param expiryPlc Expiry policy. + * @return Cached value and entry version. + * @throws IgniteCheckedException If loading value failed. + * @throws GridCacheEntryRemovedException If entry was removed. + */ + @Nullable public T2<CacheObject, GridCacheVersion> innerGetVersioned(boolean readSwap, + boolean unmarshal, + boolean updateMetrics, + boolean evt, + UUID subjId, + Object transformClo, + String taskName, + @Nullable IgniteCacheExpiryPolicy expiryPlc) + throws IgniteCheckedException, GridCacheEntryRemovedException; + + /** * Reloads entry from underlying storage. * * @return Reloaded value. http://git-wip-us.apache.org/repos/asf/ignite/blob/54bbc753/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 4bf0aa1..b22f9b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras; import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras; import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras; @@ -59,6 +60,7 @@ import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.T3; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -673,7 +675,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme String taskName, @Nullable IgniteCacheExpiryPolicy expirePlc) throws IgniteCheckedException, GridCacheEntryRemovedException { - return innerGet0(tx, + return (CacheObject)innerGet0(tx, readSwap, readThrough, evt, @@ -683,12 +685,38 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme subjId, transformClo, taskName, - expirePlc); + expirePlc, + false); + } + + /** {@inheritDoc} */ + @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned( + boolean readSwap, + boolean unmarshal, + boolean updateMetrics, + boolean evt, + UUID subjId, + Object transformClo, + String taskName, + @Nullable IgniteCacheExpiryPolicy expiryPlc) + throws IgniteCheckedException, GridCacheEntryRemovedException { + return (T2<CacheObject, GridCacheVersion>)innerGet0(null, + readSwap, + false, + evt, + unmarshal, + updateMetrics, + false, + subjId, + transformClo, + taskName, + expiryPlc, + false); } /** {@inheritDoc} */ @SuppressWarnings({"unchecked", "RedundantTypeArguments", "TooBroadScope"}) - private CacheObject innerGet0(IgniteInternalTx tx, + private Object innerGet0(IgniteInternalTx tx, boolean readSwap, boolean readThrough, boolean evt, @@ -698,8 +726,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme UUID subjId, Object transformClo, String taskName, - @Nullable IgniteCacheExpiryPolicy expiryPlc) + @Nullable IgniteCacheExpiryPolicy expiryPlc, + boolean retVer) throws IgniteCheckedException, GridCacheEntryRemovedException { + assert !(retVer && readThrough); + // Disable read-through if there is no store. if (readThrough && !cctx.readThrough()) readThrough = false; @@ -710,6 +741,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CacheObject ret = null; GridCacheVersion startVer; + GridCacheVersion resVer = null; boolean expired = false; @@ -723,6 +755,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Cache version for optimistic check. startVer = ver; + if (retVer) + resVer = isNear() ? ((GridNearCacheEntry)this).dhtVersion() : startVer; + GridCacheMvcc mvcc = mvccExtras(); owner = mvcc == null ? null : mvcc.anyOwner(); @@ -844,7 +879,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (ret != null) // If return value is consistent, then done. - return ret; + return retVer ? new T2<>(ret, resVer) : ret; boolean loadedFromStore = false; @@ -906,6 +941,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } + assert ret == null || !retVer; + return ret; } http://git-wip-us.apache.org/repos/asf/ignite/blob/54bbc753/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java index 337be06..c2102bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java @@ -455,7 +455,6 @@ public final class GridCacheMvcc { /*nearVer*/null, threadId, ver, - null, timeout, reenter, tx, @@ -470,7 +469,6 @@ public final class GridCacheMvcc { * @param nearVer Near version. * @param threadId Thread ID. * @param ver Lock version. - * @param serReadVer Optional read entry version for optimistic serializable transaction. * @param timeout Lock acquisition timeout. * @param reenter Reentry flag ({@code true} if reentry is allowed). * @param tx Transaction flag. @@ -485,7 +483,6 @@ public final class GridCacheMvcc { @Nullable GridCacheVersion nearVer, long threadId, GridCacheVersion ver, - @Nullable GridCacheVersion serReadVer, long timeout, boolean reenter, boolean tx, @@ -513,9 +510,6 @@ public final class GridCacheMvcc { if (owner == null || owner.threadId() != threadId) return null; } - - if (serReadVer != null && !serReadVer.equals(ver)) - return null; } UUID locNodeId = cctx.nodeId(); http://git-wip-us.apache.org/repos/asf/ignite/blob/54bbc753/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 82af820..90f6551 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -196,6 +197,13 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { checkObsolete(); + if (serReadVer != null) { + if (!serReadVer.equals(this.ver)) { + if (!(isNewLocked() && serReadVer.equals(IgniteTxEntry.SER_READ_NEW_ENTRY_VER))) + return null; + } + } + GridCacheMvcc mvcc = mvccExtras(); if (mvcc == null) { @@ -214,7 +222,6 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { nearVer, threadId, ver, - serReadVer, timeout, reenter, tx, http://git-wip-us.apache.org/repos/asf/ignite/blob/54bbc753/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index abbe7b8..6b8c2ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -52,12 +52,14 @@ import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CIX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -136,6 +138,12 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M /** Flag indicating whether future can be remapped on a newer topology version. */ private final boolean canRemap; + /** */ + private final boolean needVer; + + /** */ + private final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC; + /** * @param cctx Context. * @param keys Keys. @@ -149,6 +157,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M * @param deserializePortable Deserialize portable flag. * @param expiryPlc Expiry policy. * @param skipVals Skip values flag. + * @param canRemap Flag indicating whether future can be remapped on a newer topology version. + * @param resC Closure applied on 'get' result. */ public GridPartitionedGetFuture( GridCacheContext<K, V> cctx, @@ -162,11 +172,14 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M boolean deserializePortable, @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean skipVals, - boolean canRemap + boolean canRemap, + boolean needVer, + @Nullable GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC ) { super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size())); assert !F.isEmpty(keys); + assert !needVer || resC != null; this.cctx = cctx; this.keys = keys; @@ -180,6 +193,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M this.expiryPlc = expiryPlc; this.skipVals = skipVals; this.canRemap = canRemap; + this.needVer = needVer; + this.resC = resC; futId = IgniteUuid.randomUuid(); @@ -463,18 +478,39 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M if (entry != null) { boolean isNew = entry.isNewLocked(); - CacheObject v = entry.innerGet(null, - /*swap*/true, - /*read-through*/false, - /*fail-fast*/true, - /*unmarshal*/true, - /**update-metrics*/false, - /*event*/!skipVals, - /*temporary*/false, - subjId, - null, - taskName, - expiryPlc); + CacheObject v = null; + GridCacheVersion ver = null; + + if (needVer) { + T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + /*swap*/true, + /*unmarshal*/true, + /**update-metrics*/false, + /*event*/!skipVals, + subjId, + null, + taskName, + expiryPlc); + + if (res != null) { + v = res.get1(); + ver = res.get2(); + } + } + else { + v = entry.innerGet(null, + /*swap*/true, + /*read-through*/false, + /*fail-fast*/true, + /*unmarshal*/true, + /**update-metrics*/false, + /*event*/!skipVals, + /*temporary*/false, + subjId, + null, + taskName, + expiryPlc); + } colocated.context().evicts().touch(entry, topVer); @@ -484,7 +520,10 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M colocated.removeIfObsolete(key); } else { - cctx.addResult(locVals, key, v, skipVals, false, deserializePortable, true); + if (resC != null) + resC.apply(key, skipVals ? true : v, ver); + else + cctx.addResult(locVals, key, v, skipVals, false, deserializePortable, true); return false; } @@ -585,15 +624,24 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M int keysSize = infos.size(); if (keysSize != 0) { - Map<K, V> map = new GridLeanMap<>(keysSize); + if (resC != null) { + for (GridCacheEntryInfo info : infos) { + assert skipVals == (info.value() == null); - for (GridCacheEntryInfo info : infos) { - assert skipVals == (info.value() == null); - - cctx.addResult(map, info.key(), info.value(), skipVals, false, deserializePortable, false); + resC.apply(info.key(), skipVals ? true : info.value(), info.version()); + } } + else { + Map<K, V> map = new GridLeanMap<>(keysSize); + + for (GridCacheEntryInfo info : infos) { + assert skipVals == (info.value() == null); - return map; + cctx.addResult(map, info.key(), info.value(), skipVals, false, deserializePortable, false); + } + + return map; + } } return Collections.emptyMap(); http://git-wip-us.apache.org/repos/asf/ignite/blob/54bbc753/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 854a83d..6971b85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1043,7 +1043,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { deserializePortable, expiry, skipVals, - canRemap); + canRemap, + false, + null); fut.init(); http://git-wip-us.apache.org/repos/asf/ignite/blob/54bbc753/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index f38126d..de82068 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -65,9 +65,11 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.internal.util.typedef.C2; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -278,7 +280,48 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte boolean deserializePortable, @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean skipVals, - boolean canRemap + boolean canRemap) { + return loadAsync(keys, + readThrough, + reload, + forcePrimary, + topVer, subjId, + taskName, + deserializePortable, + expiryPlc, + skipVals, + canRemap, + false, + null); + } + + /** + * @param keys Keys to load. + * @param readThrough Read through flag. + * @param reload Reload flag. + * @param forcePrimary Force get from primary node flag. + * @param topVer Topology version. + * @param subjId Subject ID. + * @param taskName Task name. + * @param deserializePortable Deserialize portable flag. + * @param expiryPlc Expiry policy. + * @param skipVals Skip values flag. + * @return Loaded values. + */ + public IgniteInternalFuture<Map<K, V>> loadAsync( + @Nullable Collection<KeyCacheObject> keys, + boolean readThrough, + boolean reload, + boolean forcePrimary, + AffinityTopologyVersion topVer, + @Nullable UUID subjId, + String taskName, + boolean deserializePortable, + @Nullable IgniteCacheExpiryPolicy expiryPlc, + boolean skipVals, + boolean canRemap, + boolean needVer, + @Nullable GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c ) { if (keys == null || keys.isEmpty()) return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); @@ -288,7 +331,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte // Optimisation: try to resolve value locally and escape 'get future' creation. if (!reload && !forcePrimary) { - Map<K, V> locVals = U.newHashMap(keys.size()); + Map<K, V> locVals = null; + Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> locVals0 = null; boolean success = true; @@ -304,18 +348,39 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (entry != null) { boolean isNew = entry.isNewLocked(); - CacheObject v = entry.innerGet(null, - /*swap*/true, - /*read-through*/false, - /*fail-fast*/true, - /*unmarshal*/true, - /**update-metrics*/false, - /*event*/!skipVals, - /*temporary*/false, - subjId, - null, - taskName, - expiryPlc); + CacheObject v = null; + GridCacheVersion ver = null; + + if (needVer) { + T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + /*swap*/true, + /*unmarshal*/true, + /**update-metrics*/false, + /*event*/!skipVals, + subjId, + null, + taskName, + expiryPlc); + + if (res != null) { + v = res.get1(); + ver = res.get2(); + } + } + else { + v = entry.innerGet(null, + /*swap*/true, + /*read-through*/false, + /*fail-fast*/true, + /*unmarshal*/true, + /**update-metrics*/false, + /*event*/!skipVals, + /*temporary*/false, + subjId, + null, + taskName, + expiryPlc); + } // Entry was not in memory or in swap, so we remove it from cache. if (v == null) { @@ -326,8 +391,20 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte success = false; } - else - ctx.addResult(locVals, key, v, skipVals, false, deserializePortable, true); + else { + if (c != null) { + if (locVals0 == null) + locVals0 = U.newHashMap(keys.size()); + + locVals0.put(key, new T2<>(v, ver)); + } + else { + if (locVals == null) + locVals = U.newHashMap(keys.size()); + + ctx.addResult(locVals, key, v, skipVals, false, deserializePortable, true); + } + } } else success = false; @@ -364,6 +441,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (success) { sendTtlUpdateRequest(expiryPlc); + if (c != null) { + if (locVals0 != null) { + for (Map.Entry<KeyCacheObject, T2<CacheObject, GridCacheVersion>> e : locVals0.entrySet()) + c.apply(e.getKey(), e.getValue().get1(), e.getValue().get2()); + } + } + return new GridFinishedFuture<>(locVals); } } @@ -384,7 +468,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte deserializePortable, expiryPlc, skipVals, - canRemap); + canRemap, + needVer, + c); fut.init(); http://git-wip-us.apache.org/repos/asf/ignite/blob/54bbc753/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 01c3e2b..d1adf1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -288,7 +288,9 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda deserializePortable, expiry, skipVal, - canRemap); + canRemap, + false, + null); // init() will register future for responses if future has remote mappings. fut.init(); http://git-wip-us.apache.org/repos/asf/ignite/blob/54bbc753/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index d9763f8..3b70325 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -53,12 +53,14 @@ import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CIX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -138,6 +140,12 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma /** Flag indicating that get should be done on a locked topology version. */ private final boolean canRemap; + /** */ + private final boolean needVer; + + /** */ + private final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC; + /** * @param cctx Context. * @param keys Keys. @@ -164,11 +172,14 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma boolean deserializePortable, @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean skipVals, - boolean canRemap + boolean canRemap, + boolean needVer, + @Nullable GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC ) { super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size())); assert !F.isEmpty(keys); + assert !needVer || resC != null; this.cctx = cctx; this.keys = keys; @@ -182,6 +193,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma this.expiryPlc = expiryPlc; this.skipVals = skipVals; this.canRemap = canRemap; + this.needVer = needVer; + this.resC = resC; futId = IgniteUuid.randomUuid(); @@ -453,23 +466,43 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma while (true) { try { CacheObject v = null; + GridCacheVersion ver = null; boolean isNear = entry != null; // First we peek into near cache. - if (isNear) - v = entry.innerGet(tx, - /*swap*/false, - /*read-through*/false, - /*fail-fast*/true, - /*unmarshal*/true, - /*metrics*/true, - /*events*/!skipVals, - /*temporary*/false, - subjId, - null, - taskName, - expiryPlc); + if (isNear) { + if (needVer) { + T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + /*swap*/true, + /*unmarshal*/true, + /**update-metrics*/true, + /*event*/!skipVals, + subjId, + null, + taskName, + expiryPlc); + + if (res != null) { + v = res.get1(); + ver = res.get2(); + } + } + else { + v = entry.innerGet(tx, + /*swap*/false, + /*read-through*/false, + /*fail-fast*/true, + /*unmarshal*/true, + /*metrics*/true, + /*events*/!skipVals, + /*temporary*/false, + subjId, + null, + taskName, + expiryPlc); + } + } ClusterNode affNode = null; @@ -485,18 +518,36 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma if (dhtEntry != null) { boolean isNew = dhtEntry.isNewLocked() || !dhtEntry.valid(topVer); - v = dhtEntry.innerGet(tx, - /*swap*/true, - /*read-through*/false, - /*fail-fast*/true, - /*unmarshal*/true, - /*update-metrics*/false, - /*events*/!isNear && !skipVals, - /*temporary*/false, - subjId, - null, - taskName, - expiryPlc); + if (needVer) { + T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + /*swap*/true, + /*unmarshal*/true, + /**update-metrics*/false, + /*event*/!isNear && !skipVals, + subjId, + null, + taskName, + expiryPlc); + + if (res != null) { + v = res.get1(); + ver = res.get2(); + } + } + else { + v = dhtEntry.innerGet(tx, + /*swap*/true, + /*read-through*/false, + /*fail-fast*/true, + /*unmarshal*/true, + /*update-metrics*/false, + /*events*/!isNear && !skipVals, + /*temporary*/false, + subjId, + null, + taskName, + expiryPlc); + } // Entry was not in memory or in swap, so we remove it from cache. if (v == null && isNew && dhtEntry.markObsoleteIfEmpty(ver)) @@ -534,13 +585,17 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma } if (v != null && !reload) { - K key0 = key.value(cctx.cacheObjectContext(), true); - V val0 = v.value(cctx.cacheObjectContext(), true); + if (resC == null) { + K key0 = key.value(cctx.cacheObjectContext(), true); + V val0 = v.value(cctx.cacheObjectContext(), true); - val0 = (V)cctx.unwrapPortableIfNeeded(val0, !deserializePortable); - key0 = (K)cctx.unwrapPortableIfNeeded(key0, !deserializePortable); + val0 = (V)cctx.unwrapPortableIfNeeded(val0, !deserializePortable); + key0 = (K)cctx.unwrapPortableIfNeeded(key0, !deserializePortable); - add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); + add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); + } + else + resC.apply(key, v, ver); } else { if (affNode == null) { @@ -667,7 +722,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma ) { boolean empty = F.isEmpty(keys); - Map<K, V> map = empty ? Collections.<K, V>emptyMap() : new GridLeanMap<K, V>(keys.size()); + Map<K, V> map = (resC != null || empty) ? Collections.<K, V>emptyMap() : new GridLeanMap<K, V>(keys.size()); if (!empty) { boolean atomic = cctx.atomic(); @@ -705,7 +760,10 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma assert skipVals == (info.value() == null); - cctx.addResult(map, key, val, skipVals, false, deserializePortable, false); + if (resC != null) + resC.apply(key, skipVals ? true : val, info.version()); + else + cctx.addResult(map, key, val, skipVals, false, deserializePortable, false); } catch (GridCacheEntryRemovedException ignore) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/54bbc753/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index e70c864..37f63db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -181,7 +182,9 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> boolean readThrough, boolean deserializePortable, @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean skipVals) { + boolean skipVals, + boolean needVer, + final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c) { assert tx != null; GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx, @@ -195,7 +198,9 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> deserializePortable, expiryPlc, skipVals, - /*can remap*/true); + needVer, + /*can remap*/true, + c); // init() will register future for responses if it has remote mappings. fut.init(); http://git-wip-us.apache.org/repos/asf/ignite/blob/54bbc753/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index bebd9d1..2c2915f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -54,7 +54,7 @@ import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; -import org.apache.ignite.internal.util.lang.GridClosureException; +import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; @@ -349,7 +349,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { final Collection<KeyCacheObject> keys, boolean deserializePortable, boolean skipVals, - final IgniteBiInClosure<KeyCacheObject, Object> c + boolean needVer, + final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c ) { if (cacheCtx.isNear()) { return cacheCtx.nearTx().txLoadAsync(this, @@ -357,23 +358,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { readThrough, deserializePortable, accessPolicy(cacheCtx, keys), - skipVals).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() { + skipVals, + needVer, + c).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() { @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) { - try { - Map<Object, Object> map = f.get(); - - // Must loop through keys, not map entries, - // as map entries may not have all the keys. - for (KeyCacheObject key : keys) - c.apply(key, map.get(key.value(cacheCtx.cacheObjectContext(), false))); - - return true; - } - catch (Exception e) { - setRollbackOnly(); - - throw new GridClosureException(e); - } + return true; } }); } @@ -389,31 +378,18 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { deserializePortable, accessPolicy(cacheCtx, keys), skipVals, - /*can remap*/true + /*can remap*/true, + needVer, + c ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() { - @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) { - try { - Map<Object, Object> map = f.get(); - - // Must loop through keys, not map entries, - // as map entries may not have all the keys. - for (KeyCacheObject key : keys) - c.apply(key, map.get(key.value(cacheCtx.cacheObjectContext(), false))); - - return true; - } - catch (Exception e) { - setRollbackOnly(); - - throw new GridClosureException(e); - } - } - }); - } - else { + @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) { + return true; + } + }); + } else { assert cacheCtx.isLocal(); - return super.loadMissing(cacheCtx, readThrough, async, keys, deserializePortable, skipVals, c); + return super.loadMissing(cacheCtx, readThrough, async, keys, deserializePortable, skipVals, needVer, c); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/54bbc753/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 88752a2..0286efe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -430,6 +430,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /** {@inheritDoc} */ @Override public Collection<IgniteTxEntry> optimisticLockEntries() { + if (optimistic() && serializable()) + return allEntries(); + return writeEntries(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/54bbc753/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index d68efec..f1cd2d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -66,6 +66,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** */ private static final long serialVersionUID = 0L; + /** Dummy version for non-existing entry read in SERIALIZABLE transaction. */ + public static final GridCacheVersion SER_READ_NEW_ENTRY_VER = new GridCacheVersion(0, 0, 0, 0); + /** Owning transaction. */ @GridToStringExclude @GridDirectTransient @@ -826,12 +829,21 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { } /** - * @return + * @return Read version for serializable transaction. */ - public GridCacheVersion serializableReadVersion() { + @Nullable public GridCacheVersion serializableReadVersion() { return serReadVer; } + /** + * @param serReadVer Read version for serializable transaction. + */ + public void serializableReadVersion(GridCacheVersion serReadVer) { + assert serReadVer != null; + + this.serReadVer = serReadVer; + } + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/54bbc753/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 826172e..2b745ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -73,6 +73,7 @@ import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridClosureException; +import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.internal.util.lang.GridTuple; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -423,40 +424,44 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter final Collection<KeyCacheObject> keys, boolean deserializePortable, boolean skipVals, - final IgniteBiInClosure<KeyCacheObject, Object> c + boolean needVer, + final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c ) { - if (!async) { - try { - if (!readThrough || !cacheCtx.readThrough()) { - for (KeyCacheObject key : keys) - c.apply(key, null); - - return new GridFinishedFuture<>(false); - } - - return new GridFinishedFuture<>( - cacheCtx.store().loadAll(this, keys, c)); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } - } - else { - return cctx.kernalContext().closure().callLocalSafe( - new GPC<Boolean>() { - @Override public Boolean call() throws Exception { - if (!readThrough || !cacheCtx.readThrough()) { - for (KeyCacheObject key : keys) - c.apply(key, null); - - return false; - } - - return cacheCtx.store().loadAll(IgniteTxLocalAdapter.this, keys, c); - } - }, - true); - } + // TODO IGNITE-1607. + return new GridFinishedFuture<>(); + +// if (!async) { +// try { +// if (!readThrough || !cacheCtx.readThrough()) { +// for (KeyCacheObject key : keys) +// c.apply(key, null, null); +// +// return new GridFinishedFuture<>(false); +// } +// +// return new GridFinishedFuture<>( +// cacheCtx.store().loadAll(this, keys, c)); +// } +// catch (IgniteCheckedException e) { +// return new GridFinishedFuture<>(e); +// } +// } +// else { +// return cctx.kernalContext().closure().callLocalSafe( +// new GPC<Boolean>() { +// @Override public Boolean call() throws Exception { +// if (!readThrough || !cacheCtx.readThrough()) { +// for (KeyCacheObject key : keys) +// c.apply(key, null, null); +// +// return false; +// } +// +// return cacheCtx.store().loadAll(IgniteTxLocalAdapter.this, keys, c); +// } +// }, +// true); +// } } /** @@ -1527,7 +1532,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (log.isDebugEnabled()) log.debug("Loading missed values for missed map: " + missedMap); - final Collection<KeyCacheObject> loaded = new HashSet<>(); + final Collection<KeyCacheObject> loaded = U.newHashSet(missedMap.size()); + + final boolean needVer = optimistic() && serializable(); return new GridEmbeddedFuture<>( new C2<Boolean, Exception, Map<K, V>>() { @@ -1548,13 +1555,27 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } - if (readCommitted()) { - Collection<KeyCacheObject> notFound = new HashSet<>(missedMap.keySet()); + // In read-committed mode touch entries that have just been read. + boolean touch = readCommitted(); - notFound.removeAll(loaded); + for (KeyCacheObject key : missedMap.keySet()) { + if (loaded.contains(key)) + continue; + + GridCacheVersion ver = needVer ? IgniteTxEntry.SER_READ_NEW_ENTRY_VER : null; - // In read-committed mode touch entries that have just been read. - for (KeyCacheObject key : notFound) { + onLoaded(key, + null, + ver, + cacheCtx, + map, + missedMap, + deserializePortable, + skipVals, + keepCacheObjects, + loaded); + + if (touch) { IgniteTxEntry txEntry = entry(cacheCtx.txKey(key)); GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().peekEx(key) : @@ -1575,133 +1596,174 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter missedMap.keySet(), deserializePortable, skipVals, - new CI2<KeyCacheObject, Object>() { - /** */ - private GridCacheVersion nextVer; + needVer, + new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() { + @Override public void apply(KeyCacheObject key, + @Nullable Object val, + @Nullable GridCacheVersion loadVer) { + onLoaded(key, + val, + loadVer, + cacheCtx, + map, + missedMap, + deserializePortable, + skipVals, + keepCacheObjects, + loaded); + } + }) + ); + } - @Override public void apply(KeyCacheObject key, Object val) { - if (isRollbackOnly()) { - if (log.isDebugEnabled()) - log.debug("Ignoring loaded value for read because transaction was rolled back: " + - IgniteTxLocalAdapter.this); + /** + * @param key Key. + * @param val Value. + * @param loadVer Entry version. + * @param cacheCtx Cache context. + * @param map Return map. + * @param missedMap Missed keys. + * @param deserializePortable Deserialize portable flag. + * @param skipVals Skip values flag. + * @param keepCacheObjects Keep cache objects flag. + * @param loaded Loaded values map. + */ + private <K, V> void onLoaded( + KeyCacheObject key, + @Nullable Object val, + @Nullable GridCacheVersion loadVer, + GridCacheContext cacheCtx, + Map<K, V> map, + Map<KeyCacheObject, GridCacheVersion> missedMap, + final boolean deserializePortable, + boolean skipVals, + boolean keepCacheObjects, + Collection<KeyCacheObject> loaded) { + if (isRollbackOnly()) { + if (log.isDebugEnabled()) + log.debug("Ignoring loaded value for read because transaction was rolled back: " + + IgniteTxLocalAdapter.this); - return; - } + return; + } - GridCacheVersion ver = missedMap.get(key); + GridCacheVersion ver = missedMap.get(key); - if (ver == null) { - if (log.isDebugEnabled()) - log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']'); + if (ver == null) { + if (log.isDebugEnabled()) + log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']'); - return; - } + return; + } - CacheObject cacheVal = cacheCtx.toCacheObject(val); + CacheObject cacheVal = cacheCtx.toCacheObject(val); - CacheObject visibleVal = cacheVal; + CacheObject visibleVal = cacheVal; - IgniteTxKey txKey = cacheCtx.txKey(key); + IgniteTxKey txKey = cacheCtx.txKey(key); - IgniteTxEntry txEntry = entry(txKey); + IgniteTxEntry txEntry = entry(txKey); - if (txEntry != null) { - if (!readCommitted()) - txEntry.readValue(cacheVal); + if (txEntry != null) { + if (!readCommitted()) + txEntry.readValue(cacheVal); - if (!F.isEmpty(txEntry.entryProcessors())) - visibleVal = txEntry.applyEntryProcessors(visibleVal); - } + if (!F.isEmpty(txEntry.entryProcessors())) + visibleVal = txEntry.applyEntryProcessors(visibleVal); + } - // In pessimistic mode we hold the lock, so filter validation - // should always be valid. - if (pessimistic()) - ver = null; + // In pessimistic mode we hold the lock, so filter validation + // should always be valid. + if (pessimistic()) + ver = null; - // Initialize next version. - if (nextVer == null) - nextVer = cctx.versions().next(topologyVersion()); + // Initialize next version. + GridCacheVersion nextVer = cctx.versions().next(topologyVersion()); - while (true) { - assert txEntry != null || readCommitted() || skipVals; - - GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached(); + while (true) { + assert txEntry != null || readCommitted() || skipVals; - try { - // Must initialize to true since even if filter didn't pass, - // we still record the transaction value. - boolean set; + GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached(); - try { - set = e.versionedValue(cacheVal, ver, nextVer); - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in transaction getAll method " + - "(will try again): " + e); + try { + // Must initialize to true since even if filter didn't pass, + // we still record the transaction value. + boolean set; - if (pessimistic() && !readCommitted() && !isRollbackOnly()) { - U.error(log, "Inconsistent transaction state (entry got removed while " + - "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]"); + try { + set = e.versionedValue(cacheVal, ver, nextVer); + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in transaction getAll method " + + "(will try again): " + e); - setRollbackOnly(); + if (pessimistic() && !readCommitted() && !isRollbackOnly()) { + U.error(log, "Inconsistent transaction state (entry got removed while " + + "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]"); - return; - } + setRollbackOnly(); - if (txEntry != null) - txEntry.cached(entryEx(cacheCtx, txKey)); + return; + } - continue; // While loop. - } + if (txEntry != null) + txEntry.cached(entryEx(cacheCtx, txKey)); - // In pessimistic mode, we should always be able to set. - assert set || !pessimistic(); + continue; // While loop. + } - if (readCommitted() || skipVals) { - cacheCtx.evicts().touch(e, topologyVersion()); + // In pessimistic mode, we should always be able to set. + assert set || !pessimistic(); - if (visibleVal != null) { - cacheCtx.addResult(map, - key, - visibleVal, - skipVals, - keepCacheObjects, - deserializePortable, - false); - } - } - else { - assert txEntry != null; + if (readCommitted() || skipVals) { + cacheCtx.evicts().touch(e, topologyVersion()); - txEntry.setAndMarkValid(cacheVal); + if (visibleVal != null) { + cacheCtx.addResult(map, + key, + visibleVal, + skipVals, + keepCacheObjects, + deserializePortable, + false); + } + } + else { + assert txEntry != null; - if (visibleVal != null) { - cacheCtx.addResult(map, - key, - visibleVal, - skipVals, - keepCacheObjects, - deserializePortable, - false); - } - } + txEntry.setAndMarkValid(cacheVal); - loaded.add(key); + if (optimistic() && serializable()) { + assert loadVer != null; - if (log.isDebugEnabled()) - log.debug("Set value loaded from store into entry from transaction [set=" + set + - ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']'); + txEntry.serializableReadVersion(loadVer); + } - break; // While loop. - } - catch (IgniteCheckedException ex) { - throw new IgniteException("Failed to put value for cache entry: " + e, ex); - } - } + if (visibleVal != null) { + cacheCtx.addResult(map, + key, + visibleVal, + skipVals, + keepCacheObjects, + deserializePortable, + false); } - }) - ); + } + + if (val != null) + loaded.add(key); + + if (log.isDebugEnabled()) + log.debug("Set value loaded from store into entry from transaction [set=" + set + + ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']'); + + break; // While loop. + } + catch (IgniteCheckedException ex) { + throw new IgniteException("Failed to put value for cache entry: " + e, ex); + } + } } /** {@inheritDoc} */ @@ -1922,15 +1984,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (missed.isEmpty()) return new GridFinishedFuture<>(retMap); - IgniteInternalFuture<Map<K, V>> fut0 = checkMissed(cacheCtx, + return checkMissed(cacheCtx, retMap, missed, deserializePortable, skipVals, keepCacheObjects, skipStore); - - return fut0; } return new GridFinishedFuture<>(retMap); @@ -2379,8 +2439,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter missedForLoad, deserializePortables(cacheCtx), /*skip values*/false, - new CI2<KeyCacheObject, Object>() { - @Override public void apply(KeyCacheObject key, Object val) { + false, + new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() { + @Override public void apply(KeyCacheObject key, + @Nullable Object val, + @Nullable GridCacheVersion loadVer) { if (log.isDebugEnabled()) log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/54bbc753/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index 6f72290..8f5f37b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.lang.IgniteBiInClosure; import org.jetbrains.annotations.Nullable; @@ -173,5 +174,6 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { Collection<KeyCacheObject> keys, boolean deserializePortable, boolean skipVals, - IgniteBiInClosure<KeyCacheObject, Object> c); + boolean needVer, + GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/54bbc753/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 34c5377..8cf7fce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -993,6 +993,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** + * @param tx Transaction. + * @return {@code True} if transaction read entries should be unlocked. + */ + private boolean unlockReadEntries(IgniteInternalTx tx) { + return (tx.pessimistic() && !tx.readCommitted()) || (tx.optimistic() && tx.serializable()); + } + + /** * Commits a transaction. * * @param tx Transaction to commit. @@ -1046,8 +1054,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // 4. Unlock write resources. unlockMultiple(tx, tx.writeEntries()); - // 5. For pessimistic transaction, unlock read resources if required. - if (tx.pessimistic() && !tx.readCommitted()) + // 5. Unlock read resources if required. + if (unlockReadEntries(tx)) unlockMultiple(tx, tx.readEntries()); // 6. Notify evictions. @@ -1125,8 +1133,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // 2. Unlock write resources. unlockMultiple(tx, tx.writeEntries()); - // 3. For pessimistic transaction, unlock read resources if required. - if (tx.pessimistic() && !tx.readCommitted()) + // 3. Unlock read resources if required. + if (unlockReadEntries(tx)) unlockMultiple(tx, tx.readEntries()); // 4. Notify evictions. @@ -1188,8 +1196,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // 1. Unlock write resources. unlockMultiple(tx, tx.writeEntries()); - // 2. For pessimistic transaction, unlock read resources if required. - if (tx.pessimistic() && !tx.readCommitted()) + // 2. Unlock read resources if required. + if (unlockReadEntries(tx)) unlockMultiple(tx, tx.readEntries()); // 3. Notify evictions. http://git-wip-us.apache.org/repos/asf/ignite/blob/54bbc753/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java index 3936d1d..dfe82d4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java @@ -114,7 +114,59 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void _testTxRollbackRead1() throws Exception { + public void testTxCommitReadOnly() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + List<Integer> keys = new ArrayList<>(); + + keys.add(nearKey(cache)); + keys.add(primaryKey(cache)); + + if (ccfg.getBackups() != 0) + keys.add(backupKey(cache)); + + for (Integer key : keys) { + log.info("Test key: " + key); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertNull(val); + + tx.commit(); + } + + checkValue(key, null, ccfg.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertNull(val); + + tx.rollback(); + } + + checkValue(key, null, ccfg.getName()); + } + } + finally { + ignite0.destroyCache(ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxRollbackRead1() throws Exception { txRollbackRead(true); } @@ -134,7 +186,11 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { final IgniteTransactions txs = ignite0.transactions(); - for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + List<CacheConfiguration<Integer, Integer>> ccfgs = new ArrayList<>(); + + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, false)); + + for (CacheConfiguration<Integer, Integer> ccfg : ccfgs) { logCacheInfo(ccfg); try { @@ -163,8 +219,6 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { updateKey(cache, key, 1); - log.info("Commit"); - tx.commit(); } @@ -174,7 +228,17 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { log.info("Expected exception: " + e); } - assertEquals(1, (Object) cache.get(key)); + assertEquals(1, (Object)cache.get(key)); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object val = cache.get(key); + + assertEquals(1, val); + + tx.commit(); + } + + assertEquals(1, (Object)cache.get(key)); } } finally { @@ -602,6 +666,19 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } /** + * @param key Key. + * @param expVal Expected value. + * @param cacheName Cache name. + */ + private void checkValue(Object key, Object expVal, String cacheName) { + for (int i = 0; i < SRVS + CLIENTS; i++) { + IgniteCache<Object, Object> cache = ignite(i).cache(cacheName); + + assertEquals(expVal, cache.get(key)); + } + } + + /** * @param releaseLatch Release lock latch. * @param cache Cache. * @param key Key. @@ -665,6 +742,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { if (storeEnabled) { ccfg.setCacheStoreFactory(new TestStoreFactory()); ccfg.setWriteThrough(true); + ccfg.setReadThrough(true); } if (nearCache) http://git-wip-us.apache.org/repos/asf/ignite/blob/54bbc753/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index b754d80..e46c139 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEnt import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; import org.apache.ignite.internal.util.lang.GridTuple3; +import org.apache.ignite.internal.util.typedef.T2; import org.jetbrains.annotations.Nullable; /** @@ -441,6 +442,20 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr } /** @inheritDoc */ + @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned(boolean readSwap, + boolean unmarshal, + boolean updateMetrics, + boolean evt, + UUID subjId, + Object transformClo, + String taskName, + @Nullable IgniteCacheExpiryPolicy expiryPlc) { + assert false; + + return null; + } + + /** @inheritDoc */ @Override public CacheObject innerReload() { return val; }
