ignite-4932 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/59c9707c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/59c9707c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/59c9707c Branch: refs/heads/ignite-4932 Commit: 59c9707ccef73d2cce5ba7171225be995c247276 Parents: f9f4256 Author: sboikov <[email protected]> Authored: Wed Apr 12 16:00:12 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Apr 12 17:00:40 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 152 ++++++++---- .../processors/cache/GridCacheContext.java | 9 + .../processors/cache/GridCacheEventManager.java | 24 ++ .../processors/cache/GridCacheMapEntry.java | 45 ++-- .../processors/cache/GridCacheSwapManager.java | 22 ++ .../dht/GridPartitionedGetFuture.java | 152 +++++++----- .../dht/GridPartitionedSingleGetFuture.java | 139 +++++++---- .../dht/atomic/GridDhtAtomicCache.java | 230 ++++++++++++------- .../dht/colocated/GridDhtColocatedCache.java | 228 ++++++++++-------- .../local/atomic/GridLocalAtomicCache.java | 197 ++++++++++------ .../cache/IgniteCacheNoSyncForGetTest.java | 77 ++++++- 11 files changed, 832 insertions(+), 443 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 27a5750..5042f77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1908,80 +1908,130 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V Map<KeyCacheObject, EntryGetResult> misses = null; + boolean offheapRead = ctx.offheapRead(expiry, readerArgs != null); + for (KeyCacheObject key : keys) { while (true) { - GridCacheEntryEx entry = needEntry ? entryEx(key) : peekEx(key); - - if (entry == null) { - if (!skipVals && ctx.config().isStatisticsEnabled()) - ctx.cache().metrics0().onRead(false); - - break; - } - try { - EntryGetResult res; + EntryGetResult res = null; boolean evt = !skipVals; boolean updateMetrics = !skipVals; - if (storeEnabled) { - res = entry.innerGetAndReserveForLoad(ctx.isSwapOrOffheapEnabled(), - updateMetrics, - evt, - subjId, - taskName, - expiry, - !deserializeBinary, - readerArgs); + GridCacheEntryEx entry = null; + + boolean skipEntry; + + if (offheapRead) { + GridCacheSwapEntry swapEntry = ctx.swap().readSwapEntry(key); - assert res != null; + if (swapEntry != null) { + skipEntry = true; - if (res.value() == null) { - if (misses == null) - misses = new HashMap<>(); + long expireTime = swapEntry.expireTime(); - misses.put(key, res); + if (expireTime != 0) { + if (expireTime - U.currentTimeMillis() > 0) { + res = new EntryGetWithTtlResult(swapEntry.value(), + swapEntry.version(), + false, + expireTime, + swapEntry.ttl()); + } + else + skipEntry = false; // Do not skip entry if need process expiration. + } + else + res = new EntryGetResult(swapEntry.value(), swapEntry.version(), false); + } + else + skipEntry = !storeEnabled; + + if (skipEntry) { + if (evt) { + ctx.events().readEvent(key, + null, + swapEntry != null ? swapEntry.value() : null, + subjId, + taskName, + !deserializeBinary); + } - res = null; + if (updateMetrics && ctx.cache().configuration().isStatisticsEnabled()) + ctx.cache().metrics0().onRead(swapEntry != null); } } - else { - if (needVer || readerArgs != null) { - res = entry.innerGetVersioned( - null, - null, - ctx.isSwapOrOffheapEnabled(), - /*unmarshal*/true, + else + skipEntry = false; + + if (!skipEntry) { + entry = needEntry ? entryEx(key) : peekEx(key); + + if (entry == null) { + if (!skipVals && ctx.config().isStatisticsEnabled()) + ctx.cache().metrics0().onRead(false); + + break; + } + + if (storeEnabled) { + res = entry.innerGetAndReserveForLoad(ctx.isSwapOrOffheapEnabled(), updateMetrics, evt, subjId, - null, taskName, expiry, !deserializeBinary, readerArgs); + + assert res != null; + + if (res.value() == null) { + if (misses == null) + misses = new HashMap<>(); + + misses.put(key, res); + + res = null; + } } else { - CacheObject val = entry.innerGet( - null, - null, - ctx.isSwapOrOffheapEnabled(), - false, - updateMetrics, - evt, - false, - subjId, - null, - taskName, - expiry, - !deserializeBinary); + if (needVer || readerArgs != null) { + res = entry.innerGetVersioned( + null, + null, + ctx.isSwapOrOffheapEnabled(), + /*unmarshal*/true, + updateMetrics, + evt, + subjId, + null, + taskName, + expiry, + !deserializeBinary, + readerArgs); + } + else { + CacheObject val = entry.innerGet( + null, + null, + ctx.isSwapOrOffheapEnabled(), + false, + updateMetrics, + evt, + false, + subjId, + null, + taskName, + expiry, + !deserializeBinary); + + res = val != null ? new EntryGetResult(val, null) : null; + } - res = val != null ? new EntryGetResult(val, null) : null; + if (res == null) + ctx.evicts().touch(entry, topVer); } - - if (res == null) - ctx.evicts().touch(entry, topVer); } if (res != null) { @@ -1994,7 +2044,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V true, needVer); - if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)) + if (entry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))) ctx.evicts().touch(entry, topVer); if (keysSize == 1) http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 3b44b50..0985161 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -2058,6 +2058,15 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @param expiryPlc + * @param readers + * @return + */ + public boolean offheapRead(IgniteCacheExpiryPolicy expiryPlc, boolean readers) { + return offheapTiered() && isSwapOrOffheapEnabled() && expiryPlc == null && !readers; + } + + /** * @param part Partition. * @param affNodes Affinity nodes. * @param topVer Topology version. http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java index 1c18738..8953b63 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED; import static org.apache.ignite.events.EventType.EVT_CACHE_STARTED; import static org.apache.ignite.events.EventType.EVT_CACHE_STOPPED; @@ -61,6 +62,29 @@ public class GridCacheEventManager extends GridCacheManagerAdapter { cctx.gridEvents().removeLocalEventListener(lsnr); } + public void readEvent(KeyCacheObject key, + IgniteInternalTx tx, + CacheObject val, + UUID subjId, + String taskName, + boolean keepBinary) { + if (isRecordable(EVT_CACHE_OBJECT_READ)) { + addEvent(cctx.affinity().partition(key), + key, + tx, + null, + EVT_CACHE_OBJECT_READ, + val, + val != null, + val, + val != null, + subjId, + null, + taskName, + keepBinary); + } + } + /** * @param part Partition. * @param key Key for the event. http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 34f8b96..b9ebed3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -874,31 +874,36 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme Object res = null; - // TODO IGNITE-4932: metrics/events. + if (readerArgs == null && expiryPlc == null && !retVer && cctx.config().isEagerTtl()) { + // Fast heap get without 'synchronized'. + CacheObject val0 = this.val; - if (readerArgs == null && expiryPlc == null) { - if (!retVer && cctx.config().isEagerTtl()) { // Fast heap get. - CacheObject val0 = this.val; - - if (val0 != null) - return val0; - } + if (val0 != null) { + if (updateMetrics && cctx.cache().configuration().isStatisticsEnabled()) + cctx.cache().metrics0().onRead(true); - if (cctx.isSwapOrOffheapEnabled() && readSwap) { - GridCacheSwapEntry swapEntry = cctx.swap().read(this, false, true, true, false); + if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { + transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo); - if (swapEntry != null) { - long expireTime = swapEntry.expireTime(); + GridCacheMvcc mvcc = mvccExtras(); - if (expireTime != 0) { - if (expireTime - U.currentTimeMillis() > 0) { - return retVer ? new EntryGetWithTtlResult(val, ver, false, expireTime, swapEntry.ttl()) : - swapEntry.value(); - } - } - else - return retVer ? new EntryGetResult(val, ver, false) : swapEntry.value(); + cctx.events().addEvent( + partition(), + key, + tx, + mvcc != null ? mvcc.anyOwner() : null, + EVT_CACHE_OBJECT_READ, + val0, + true, + val0, + true, + subjId, + transformClo != null ? transformClo.getClass().getName() : null, + taskName, + keepBinary); } + + return val0; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index 159b3b8..07edaff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -819,6 +819,28 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } /** + * @param key Key. + * @return Read value. + * @throws IgniteCheckedException If failed. + */ + @Nullable public GridCacheSwapEntry readSwapEntry(KeyCacheObject key) throws IgniteCheckedException { + assert offheapEnabled || swapEnabled; + + GridCacheSwapEntry entry = read(key, + key.valueBytes(cctx.cacheObjectContext()), + cctx.affinity().partition(key), + false, + true, + true, + false); + + assert entry == null || entry.value() != null : entry; + assert entry == null || entry.version() != null : entry; + + return entry; + } + + /** * @param entry Entry to read. * @return Read value address. * @throws IgniteCheckedException If read failed. http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 519239a..798e2dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; @@ -437,81 +438,118 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda GridDhtCacheAdapter<K, V> cache = cache(); - while (true) { - GridCacheEntryEx entry; + boolean offheapRead = cctx.offheapRead(expiryPlc, false); + boolean evt = !skipVals; + while (true) { try { - entry = cache.context().isSwapOrOffheapEnabled() ? cache.entryEx(key) : cache.peekEx(key); + boolean skipEntry; - // If our DHT cache do has value, then we peek it. - if (entry != null) { - boolean isNew = entry.isNewLocked(); + EntryGetResult getRes = null; + CacheObject v = null; + GridCacheVersion ver = null; - EntryGetResult getRes = null; - CacheObject v = null; - GridCacheVersion ver = null; + if (offheapRead) { + skipEntry = true; - if (needVer) { - getRes = entry.innerGetVersioned( - null, - null, - /*swap*/true, - /*unmarshal*/true, - /**update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiryPlc, - !deserializeBinary, - null); + GridCacheSwapEntry swapEntry = cctx.swap().readSwapEntry(key); + + if (swapEntry != null) { + long expireTime = swapEntry.expireTime(); + + if (expireTime == 0 || expireTime < U.currentTimeMillis()) { + v = swapEntry.value(); - if (getRes != null) { - v = getRes.value(); - ver = getRes.version(); + if (needVer) + ver = swapEntry.version(); } + else + skipEntry = false; } - else { - v = entry.innerGet( - null, + + if (skipEntry && evt) { + cctx.events().readEvent(key, null, - /*swap*/true, - /*read-through*/false, - /**update-metrics*/false, - /*event*/!skipVals, - /*temporary*/false, + swapEntry != null ? swapEntry.value() : null, subjId, - null, taskName, - expiryPlc, !deserializeBinary); } + } + else + skipEntry = false; + + if (!skipEntry) { + GridCacheEntryEx entry = + cache.context().isSwapOrOffheapEnabled() ? cache.entryEx(key) : cache.peekEx(key); + + // If our DHT cache do has value, then we peek it. + if (entry != null) { + boolean isNew = entry.isNewLocked(); + + if (needVer) { + getRes = entry.innerGetVersioned( + null, + null, + /*swap*/true, + /*unmarshal*/true, + /*update-metrics*/false, + /*event*/evt, + subjId, + null, + taskName, + expiryPlc, + !deserializeBinary, + null); + + if (getRes != null) { + v = getRes.value(); + ver = getRes.version(); + } + } + else { + v = entry.innerGet( + null, + null, + /*swap*/true, + /*read-through*/false, + /*update-metrics*/false, + /*event*/evt, + /*temporary*/false, + subjId, + null, + taskName, + expiryPlc, + !deserializeBinary); + } - cache.context().evicts().touch(entry, topVer); + cache.context().evicts().touch(entry, topVer); - // Entry was not in memory or in swap, so we remove it from cache. - if (v == null) { - if (isNew && entry.markObsoleteIfEmpty(ver)) - cache.removeEntry(entry); - } - else { - cctx.addResult(locVals, - key, - v, - skipVals, - keepCacheObjects, - deserializeBinary, - true, - getRes, - ver, - 0, - 0, - needVer); - - return true; + // Entry was not in memory or in swap, so we remove it from cache. + if (v == null) { + if (isNew && entry.markObsoleteIfEmpty(ver)) + cache.removeEntry(entry); + } } } + if (v != null) { + cctx.addResult(locVals, + key, + v, + skipVals, + keepCacheObjects, + deserializeBinary, + true, + getRes, + ver, + 0, + 0, + needVer); + + return true; + } + boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion()); // Entry not found, do not continue search if topology did not change and there is no store. http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index a3f6b72..11d4fc1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue; @@ -360,74 +361,110 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im GridDhtCacheAdapter colocated = cctx.dht(); - while (true) { - GridCacheEntryEx entry; + boolean offheapRead = cctx.offheapRead(expiryPlc, false); + boolean evt = !skipVals; + while (true) { try { - entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) : - colocated.peekEx(key); + CacheObject v = null; + GridCacheVersion ver = null; - // If our DHT cache do has value, then we peek it. - if (entry != null) { - boolean isNew = entry.isNewLocked(); + boolean skipEntry; - CacheObject v = null; - GridCacheVersion ver = null; + if (offheapRead) { + skipEntry = true; - if (needVer) { - EntryGetResult res = entry.innerGetVersioned( - null, - null, - /*swap*/true, - /*unmarshal*/true, - /**update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiryPlc, - true, - null); + GridCacheSwapEntry swapEntry = cctx.swap().readSwapEntry(key); - if (res != null) { - v = res.value(); - ver = res.version(); + if (swapEntry != null) { + long expireTime = swapEntry.expireTime(); + + if (expireTime == 0 || expireTime < U.currentTimeMillis()) { + v = swapEntry.value(); + + if (needVer) + ver = swapEntry.version(); } + else + skipEntry = false; } - else { - v = entry.innerGet( - null, + + if (skipEntry && evt) { + cctx.events().readEvent(key, null, - /*swap*/true, - /*read-through*/false, - /**update-metrics*/false, - /*event*/!skipVals, - /*temporary*/false, + swapEntry != null ? swapEntry.value() : null, subjId, - null, taskName, - expiryPlc, - true); + !deserializeBinary); } + } + else + skipEntry = false; + + if (!skipEntry) { + GridCacheEntryEx entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) : + colocated.peekEx(key); + + // If our DHT cache do has value, then we peek it. + if (entry != null) { + boolean isNew = entry.isNewLocked(); + + if (needVer) { + EntryGetResult res = entry.innerGetVersioned( + null, + null, + /*swap*/true, + /*unmarshal*/true, + /*update-metrics*/false, + /*event*/evt, + subjId, + null, + taskName, + expiryPlc, + true, + null); + + if (res != null) { + v = res.value(); + ver = res.version(); + } + } + else { + v = entry.innerGet( + null, + null, + /*swap*/true, + /*read-through*/false, + /*update-metrics*/false, + /*event*/evt, + /*temporary*/false, + subjId, + null, + taskName, + expiryPlc, + true); + } - colocated.context().evicts().touch(entry, topVer); + colocated.context().evicts().touch(entry, topVer); - // Entry was not in memory or in swap, so we remove it from cache. - if (v == null) { - if (isNew && entry.markObsoleteIfEmpty(ver)) - colocated.removeEntry(entry); + if (v == null) { + // Entry was not in memory or in swap, so we remove it from cache. + if (isNew && entry.markObsoleteIfEmpty(ver)) + colocated.removeEntry(entry); + } } - else { - if (!skipVals && cctx.config().isStatisticsEnabled()) - cctx.cache().metrics0().onRead(true); + } - if (!skipVals) - setResult(v, ver); - else - setSkipValueResult(true, ver); + if (v != null) { + if (!skipVals && cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onRead(true); - return true; - } + if (!skipVals) + setResult(v, ver); + else + setSkipValueResult(true, ver); + + return true; } boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion()); http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 8523366..c6bceef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; +import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry; import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult; import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; @@ -1565,114 +1566,165 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc); + boolean evt = !skipVals; + // Optimisation: try to resolve value locally and escape 'get future' creation. if (!forcePrimary && ctx.affinityNode()) { - Map<K, V> locVals = U.newHashMap(keys.size()); - - boolean success = true; - - // Optimistically expect that all keys are available locally (avoid creation of get future). - for (KeyCacheObject key : keys) { - GridCacheEntryEx entry = null; - - while (true) { - try { - entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key); - - // If our DHT cache do has value, then we peek it. - if (entry != null) { - boolean isNew = entry.isNewLocked(); - - EntryGetResult getRes = null; - CacheObject v = null; - GridCacheVersion ver = null; - - if (needVer) { - getRes = entry.innerGetVersioned( - null, - null, - /*swap*/true, - /*unmarshal*/true, - /**update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiry, + try { + Map<K, V> locVals = U.newHashMap(keys.size()); + + boolean success = true; + boolean offheapRead = ctx.offheapRead(expiry, false); + + // Optimistically expect that all keys are available locally (avoid creation of get future). + for (KeyCacheObject key : keys) { + if (offheapRead) { + GridCacheSwapEntry swapEntry = ctx.swap().readSwapEntry(key); + + if (swapEntry != null) { + long expireTime = swapEntry.expireTime(); + + if (expireTime == 0 || expireTime < U.currentTimeMillis()) { + ctx.addResult(locVals, + key, + swapEntry.value(), + skipVals, + false, + deserializeBinary, true, - null); - - if (getRes != null) { - v = getRes.value(); - ver = getRes.version(); - } - } - else { - v = entry.innerGet(null, - null, - /*swap*/true, - /*read-through*/false, - /**update-metrics*/false, - /*event*/!skipVals, - /*temporary*/false, - subjId, null, - taskName, - expiry, - !deserializeBinary); - } - - // Entry was not in memory or in swap, so we remove it from cache. - if (v == null) { - GridCacheVersion obsoleteVer = context().versions().next(); - - if (isNew && entry.markObsoleteIfEmpty(obsoleteVer)) - removeEntry(entry); - - success = false; + swapEntry.version(), + 0, + 0, + needVer); + + if (evt) { + ctx.events().readEvent(key, + null, + swapEntry.value(), + subjId, + taskName, + !deserializeBinary); + } } else - ctx.addResult(locVals, key, v, skipVals, false, deserializeBinary, true, - getRes, ver, 0, 0, needVer); + success = false; } else success = false; - - break; // While. } - catch (GridCacheEntryRemovedException ignored) { - // No-op, retry. - } - catch (GridDhtInvalidPartitionException ignored) { - success = false; + else { + GridCacheEntryEx entry = null; + + while (true) { + try { + entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key); + + // If our DHT cache do has value, then we peek it. + if (entry != null) { + boolean isNew = entry.isNewLocked(); + + EntryGetResult getRes = null; + CacheObject v = null; + GridCacheVersion ver = null; + + if (needVer) { + getRes = entry.innerGetVersioned( + null, + null, + /*swap*/true, + /*unmarshal*/true, + /*update-metrics*/false, + /*event*/evt, + subjId, + null, + taskName, + expiry, + true, + null); + + if (getRes != null) { + v = getRes.value(); + ver = getRes.version(); + } + } + else { + v = entry.innerGet(null, + null, + /*swap*/true, + /*read-through*/false, + /*update-metrics*/false, + /*event*/evt, + /*temporary*/false, + subjId, + null, + taskName, + expiry, + !deserializeBinary); + } + + // Entry was not in memory or in swap, so we remove it from cache. + if (v == null) { + if (isNew && entry.markObsoleteIfEmpty(context().versions().next())) + removeEntry(entry); + + success = false; + } + else { + ctx.addResult(locVals, + key, + v, + skipVals, + false, + deserializeBinary, + true, + getRes, + ver, + 0, + 0, + needVer); + } + } + else + success = false; - break; // While. - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } - finally { - if (entry != null) - ctx.evicts().touch(entry, topVer); + break; // While. + } + catch (GridCacheEntryRemovedException ignored) { + // No-op, retry. + } + catch (GridDhtInvalidPartitionException ignored) { + success = false; + + break; // While. + } + finally { + if (entry != null) + ctx.evicts().touch(entry, topVer); + } + } + + if (!success) + break; + else if (!skipVals && ctx.config().isStatisticsEnabled()) + metrics0().onRead(true); } } - if (!success) - break; - else if (!skipVals && ctx.config().isStatisticsEnabled()) - metrics0().onRead(true); - } + if (success) { + sendTtlUpdateRequest(expiry); - if (success) { - sendTtlUpdateRequest(expiry); + return new GridFinishedFuture<>(locVals); + } - return new GridFinishedFuture<>(locVals); + if (expiry != null) + expiry.reset(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); } } - if (expiry != null) - expiry.reset(); - // Either reload or not all values are available locally. GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx, keys, http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index c8556e5..4b1dd9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheReturn; +import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; @@ -451,121 +452,162 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte // Optimisation: try to resolve value locally and escape 'get future' creation. if (!forcePrimary) { - Map<K, V> locVals = null; + try { + Map<K, V> locVals = null; - boolean success = true; + boolean success = true; + boolean offheapRead = ctx.offheapRead(expiryPlc, false); + boolean evt = !skipVals; - // Optimistically expect that all keys are available locally (avoid creation of get future). - for (KeyCacheObject key : keys) { - GridCacheEntryEx entry = null; - - while (true) { - try { - entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key); - - // If our DHT cache do has value, then we peek it. - if (entry != null) { - boolean isNew = entry.isNewLocked(); - - EntryGetResult getRes = null; - CacheObject v = null; - GridCacheVersion ver = null; - - if (needVer) { - getRes = entry.innerGetVersioned( - null, - null, - /*swap*/true, - /*unmarshal*/true, - /**update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiryPlc, - !deserializeBinary, - null); - - if (getRes != null) { - v = getRes.value(); - ver = getRes.version(); - } - } - else { - v = entry.innerGet( - null, - null, - /*swap*/true, - /*read-through*/false, - /**update-metrics*/false, - /*event*/!skipVals, - /*temporary*/false, - subjId, - null, - taskName, - expiryPlc, - !deserializeBinary); - } - - // Entry was not in memory or in swap, so we remove it from cache. - if (v == null) { - GridCacheVersion obsoleteVer = context().versions().next(); + // Optimistically expect that all keys are available locally (avoid creation of get future). + for (KeyCacheObject key : keys) { + if (offheapRead) { + GridCacheSwapEntry swapEntry = ctx.swap().readSwapEntry(key); - if (isNew && entry.markObsoleteIfEmpty(obsoleteVer)) - removeEntry(entry); - - success = false; - } - else { - if (locVals == null) - locVals = U.newHashMap(keys.size()); + if (swapEntry != null) { + long expireTime = swapEntry.expireTime(); + if (expireTime == 0 || expireTime < U.currentTimeMillis()) { ctx.addResult(locVals, key, - v, + swapEntry.value(), skipVals, - keepCacheObj, + false, deserializeBinary, true, - getRes, - ver, + null, + swapEntry.version(), 0, 0, needVer); + + if (evt) { + ctx.events().readEvent(key, + null, + swapEntry.value(), + subjId, + taskName, + !deserializeBinary); + } } + else + success = false; } else success = false; - - break; // While. } - catch (GridCacheEntryRemovedException ignored) { - // No-op, retry. - } - catch (GridDhtInvalidPartitionException ignored) { - success = false; + else { + GridCacheEntryEx entry = null; + + while (true) { + try { + entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key); + + // If our DHT cache do has value, then we peek it. + if (entry != null) { + boolean isNew = entry.isNewLocked(); + + EntryGetResult getRes = null; + CacheObject v = null; + GridCacheVersion ver = null; + + if (needVer) { + getRes = entry.innerGetVersioned( + null, + null, + /*swap*/true, + /*unmarshal*/true, + /*update-metrics*/false, + /*event*/evt, + subjId, + null, + taskName, + expiryPlc, + !deserializeBinary, + null); + + if (getRes != null) { + v = getRes.value(); + ver = getRes.version(); + } + } + else { + v = entry.innerGet( + null, + null, + /*swap*/true, + /*read-through*/false, + /*update-metrics*/false, + /*event*/evt, + /*temporary*/false, + subjId, + null, + taskName, + expiryPlc, + !deserializeBinary); + } + + // Entry was not in memory or in swap, so we remove it from cache. + if (v == null) { + GridCacheVersion obsoleteVer = context().versions().next(); + + if (isNew && entry.markObsoleteIfEmpty(obsoleteVer)) + removeEntry(entry); + + success = false; + } + else { + if (locVals == null) + locVals = U.newHashMap(keys.size()); + + ctx.addResult(locVals, + key, + v, + skipVals, + keepCacheObj, + deserializeBinary, + true, + getRes, + ver, + 0, + 0, + needVer); + } + } + else + success = false; - break; // While. - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } - finally { - if (entry != null) - context().evicts().touch(entry, topVer); + break; // While. + } + catch (GridCacheEntryRemovedException ignored) { + // No-op, retry. + } + catch (GridDhtInvalidPartitionException ignored) { + success = false; + + break; // While. + } + finally { + if (entry != null) + context().evicts().touch(entry, topVer); + } + } } - } - if (!success) - break; - else if (!skipVals && ctx.config().isStatisticsEnabled()) - ctx.cache().metrics0().onRead(true); - } + if (!success) + break; + else if (!skipVals && ctx.config().isStatisticsEnabled()) + ctx.cache().metrics0().onRead(true); + } - if (success) { - sendTtlUpdateRequest(expiryPlc); + if (success) { + sendTtlUpdateRequest(expiryPlc); - return new GridFinishedFuture<>(locVals); + return new GridFinishedFuture<>(locVals); + } + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index f86df2f..1f66fdf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCachePreloader; import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter; import org.apache.ignite.internal.processors.cache.GridCacheReturn; +import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.local.GridLocalCache; @@ -397,97 +398,149 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { final IgniteCacheExpiryPolicy expiry = expiryPolicy(opCtx != null ? opCtx.expiry() : null); boolean success = true; + final boolean offheapRead = ctx.offheapRead(expiry, false); + final boolean evt = !skipVals; for (K key : keys) { if (key == null) throw new NullPointerException("Null key."); - GridCacheEntryEx entry = null; - KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); - while (true) { - try { - entry = swapOrOffheap ? entryEx(cacheKey) : peekEx(cacheKey); + boolean skipEntry; - if (entry != null) { - CacheObject v; + if (offheapRead) { + skipEntry = true; - if (needVer) { - EntryGetResult res = entry.innerGetVersioned( - null, - null, - /*swap*/swapOrOffheap, - /*unmarshal*/true, - /**update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiry, - !deserializeBinary, - null); - - if (res != null) { - ctx.addResult( - vals, - cacheKey, - res, - skipVals, - false, - deserializeBinary, - true, - needVer); - } - else - success = false; - } - else { - v = entry.innerGet( - null, + GridCacheSwapEntry swapEntry = ctx.swap().readSwapEntry(cacheKey); + + if (swapEntry != null) { + long expireTime = swapEntry.expireTime(); + + if (expireTime == 0 || expireTime < U.currentTimeMillis()) { + ctx.addResult(vals, + cacheKey, + swapEntry.value(), + skipVals, + false, + deserializeBinary, + true, + null, + swapEntry.version(), + 0, + 0, + needVer); + + if (configuration().isStatisticsEnabled() && !skipVals) + metrics0().onRead(true); + + if (evt) { + ctx.events().readEvent(cacheKey, null, - /*swap*/swapOrOffheap, - /*read-through*/false, - /**update-metrics*/true, - /**event*/!skipVals, - /**temporary*/false, + swapEntry.value(), subjId, - null, taskName, - expiry, !deserializeBinary); + } + } + else + skipEntry = false; + } + else + success = false; + + if (skipEntry && !success && !storeEnabled && configuration().isStatisticsEnabled() && !skipVals) + metrics0().onRead(false); + } + else + skipEntry = false; + + if (!skipEntry) { + GridCacheEntryEx entry = null; + + CacheObject v; + + while (true) { + try { + entry = swapOrOffheap ? entryEx(cacheKey) : peekEx(cacheKey); - if (v != null) { - ctx.addResult(vals, - cacheKey, - v, - skipVals, - false, - deserializeBinary, - true, + if (entry != null) { + if (needVer) { + EntryGetResult res = entry.innerGetVersioned( + null, + null, + /*swap*/swapOrOffheap, + /*unmarshal*/true, + /*update-metrics*/false, + /*event*/!skipVals, + subjId, null, - 0, - 0); + taskName, + expiry, + !deserializeBinary, + null); + + if (res != null) { + ctx.addResult( + vals, + cacheKey, + res, + skipVals, + false, + deserializeBinary, + true, + needVer); + } + else + success = false; + } + else { + v = entry.innerGet( + null, + null, + /*swap*/swapOrOffheap, + /*read-through*/false, + /*update-metrics*/true, + /*event*/!skipVals, + /*temporary*/false, + subjId, + null, + taskName, + expiry, + !deserializeBinary); + + if (v != null) { + ctx.addResult(vals, + cacheKey, + v, + skipVals, + false, + deserializeBinary, + true, + null, + 0, + 0); + } + else + success = false; } - else - success = false; } - } - else { - if (!storeEnabled && configuration().isStatisticsEnabled() && !skipVals) - metrics0().onRead(false); + else { + if (!storeEnabled && configuration().isStatisticsEnabled() && !skipVals) + metrics0().onRead(false); - success = false; - } + success = false; + } - break; // While. - } - catch (GridCacheEntryRemovedException ignored) { - // No-op, retry. - } - finally { - if (entry != null) - ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion()); + break; // While. + } + catch (GridCacheEntryRemovedException ignored) { + // No-op, retry. + } + finally { + if (entry != null) + ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion()); + } } if (!success && storeEnabled) http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java index faa63b3..0dbfc7e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.processors.cache; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -27,7 +30,6 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheMemoryMode; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; @@ -93,37 +95,51 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testAtomicGet() throws Exception { - doGet(ATOMIC, ONHEAP_TIERED); + doGet(ATOMIC, ONHEAP_TIERED, false); + + doGet(ATOMIC, ONHEAP_TIERED, true); } /** * @throws Exception If failed. */ public void testAtomicGetOffheap() throws Exception { - doGet(ATOMIC, OFFHEAP_TIERED); + doGet(ATOMIC, OFFHEAP_TIERED, false); + + doGet(ATOMIC, OFFHEAP_TIERED, true); } /** * @throws Exception If failed. */ - private void doGet(CacheAtomicityMode atomicityMode, CacheMemoryMode memoryMode) throws Exception { + private void doGet(CacheAtomicityMode atomicityMode, + CacheMemoryMode memoryMode, + final boolean getAll) throws Exception { Ignite srv = ignite(0); Ignite client = ignite(1); final IgniteCache cache = client.createCache(cacheConfiguration(atomicityMode, memoryMode)); + final Map<Object, Object> data = new HashMap<>(); + + data.put(1, 1); + data.put(2, 2); + try { // Get from compute closure. { - cache.put(1, 1); + cache.putAll(data); hangLatch = new CountDownLatch(1); processorStartLatch = new CountDownLatch(1); IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() { @Override public Void call() throws Exception { - cache.invoke(1, new HangEntryProcessor()); + if (getAll) + cache.invokeAll(data.keySet(), new HangEntryProcessor()); + else + cache.invoke(1, new HangEntryProcessor()); return null; } @@ -134,7 +150,14 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest { assertTrue(wait); - assertEquals(1, client.compute().affinityCall(cache.getName(), 1, new GetClosure(1, cache.getName()))); + if (getAll) { + assertEquals(data, client.compute().affinityCall(cache.getName(), 1, + new GetAllClosure(data.keySet(), cache.getName()))); + } + else { + assertEquals(1, client.compute().affinityCall(cache.getName(), 1, + new GetClosure(1, cache.getName()))); + } hangLatch.countDown(); @@ -147,14 +170,17 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest { // Local get. { - cache.put(1, 1); + cache.putAll(data); hangLatch = new CountDownLatch(1); processorStartLatch = new CountDownLatch(1); IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() { @Override public Void call() throws Exception { - cache.invoke(1, new HangEntryProcessor()); + if (getAll) + cache.invokeAll(data.keySet(), new HangEntryProcessor()); + else + cache.invoke(1, new HangEntryProcessor()); return null; } @@ -165,7 +191,10 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest { assertTrue(wait); - assertEquals(1, srv.cache(cache.getName()).get(1)); + if (getAll) + assertEquals(data, srv.cache(cache.getName()).getAll(data.keySet())); + else + assertEquals(1, srv.cache(cache.getName()).get(1)); hangLatch.countDown(); @@ -249,4 +278,32 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest { return ignite.cache(cacheName).get(key); } } + + /** + * + */ + public static class GetAllClosure implements IgniteCallable<Object> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private final Set<Object> keys; + + /** */ + private final String cacheName; + + /** + * @param keys Keys. + */ + public GetAllClosure(Set<Object> keys, String cacheName) { + this.keys = keys; + this.cacheName = cacheName; + } + + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + return ignite.cache(cacheName).getAll(keys); + } + } }
