Repository: ignite Updated Branches: refs/heads/ignite-971 [created] b92221643
ignite-971 Fix offheap to swap eviction. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b9222164 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b9222164 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b9222164 Branch: refs/heads/ignite-971 Commit: b92221643729be85863d180b14e83c7268dafae4 Parents: 60a76bc Author: sboikov <[email protected]> Authored: Wed Sep 9 16:32:06 2015 +0300 Committer: sboikov <[email protected]> Committed: Wed Sep 9 17:45:37 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheEntryEx.java | 13 ++ .../cache/GridCacheEvictionManager.java | 18 +- .../processors/cache/GridCacheMapEntry.java | 45 ++++- .../cache/GridCacheSwapEntryImpl.java | 24 +-- .../processors/cache/GridCacheSwapManager.java | 175 +++++++++++++++---- .../processors/cache/GridCacheUtils.java | 17 +- .../distributed/dht/GridDhtLocalPartition.java | 7 +- .../offheap/GridOffHeapProcessor.java | 25 ++- .../util/offheap/GridOffHeapEvictListener.java | 5 + .../internal/util/offheap/GridOffHeapMap.java | 13 +- .../util/offheap/GridOffHeapMapFactory.java | 28 +-- .../util/offheap/GridOffHeapPartitionedMap.java | 11 ++ .../util/offheap/unsafe/GridUnsafeMap.java | 127 +++++++++----- .../unsafe/GridUnsafePartitionedMap.java | 9 + .../cache/CacheSwapUnswapGetTest.java | 85 ++++++++- .../processors/cache/GridCacheTestEntryEx.java | 6 + .../offheap/GridOffHeapMapAbstractSelfTest.java | 16 +- .../GridOffHeapMapPerformanceAbstractTest.java | 4 +- ...idOffHeapPartitionedMapAbstractSelfTest.java | 20 +++ .../unsafe/GridUnsafeMapPerformanceTest.java | 2 +- .../offheap/unsafe/GridUnsafeMapSelfTest.java | 2 +- .../GridOffHeapMapPerformanceAbstractTest.java | 4 +- .../unsafe/GridUnsafeMapPerformanceTest.java | 2 +- .../IgniteCacheQueryMultiThreadedSelfTest.java | 9 +- ...QueryOffheapEvictsMultiThreadedSelfTest.java | 5 - 25 files changed, 521 insertions(+), 151 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 98e86ed..430590a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -867,6 +867,19 @@ public interface GridCacheEntryEx { public void updateTtl(@Nullable GridCacheVersion ver, long ttl); /** + * Tries to do offheap -> swap eviction. + * + * @param entry Serialized swap entry. + * @param evictVer Version when entry was selected for eviction. + * @param obsoleteVer Obsolete version. + * @throws IgniteCheckedException If failed. + * @throws GridCacheEntryRemovedException If entry was removed. + * @return {@code True} if entry was obsoleted and written to swap. + */ + public boolean offheapSwapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer) + throws IgniteCheckedException, GridCacheEntryRemovedException; + + /** * @return Value. * @throws IgniteCheckedException If failed to read from swap storage. * @throws GridCacheEntryRemovedException If entry was removed. http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java index f60c0eb..3e0e2f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java @@ -958,7 +958,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { List<GridCacheEntryEx> locked = new ArrayList<>(keys.size()); - Set<GridCacheEntryEx> notRemove = null; + Set<GridCacheEntryEx> notRmv = null; Collection<GridCacheBatchSwapEntry> swapped = new ArrayList<>(keys.size()); @@ -990,10 +990,10 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { locked.add(entry); if (entry.obsolete()) { - if (notRemove == null) - notRemove = new HashSet<>(); + if (notRmv == null) + notRmv = new HashSet<>(); - notRemove.add(entry); + notRmv.add(entry); continue; } @@ -1004,11 +1004,19 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { GridCacheBatchSwapEntry swapEntry = entry.evictInBatchInternal(obsoleteVer); if (swapEntry != null) { + assert entry.obsolete() : entry; + swapped.add(swapEntry); if (log.isDebugEnabled()) log.debug("Entry was evicted [entry=" + entry + ", localNode=" + cctx.nodeId() + ']'); } + else if (!entry.obsolete()) { + if (notRmv == null) + notRmv = new HashSet<>(); + + notRmv.add(entry); + } } // Batch write to swap. @@ -1025,7 +1033,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { // Remove entries and fire events outside the locks. for (GridCacheEntryEx entry : locked) { - if (entry.obsolete() && (notRemove == null || !notRemove.contains(entry))) { + if (entry.obsolete() && (notRmv == null || !notRmv.contains(entry))) { entry.onMarkedObsolete(); cache.removeEntry(entry); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index eb4d864..3fc1b2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -433,6 +433,43 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** {@inheritDoc} */ + @Override public boolean offheapSwapEvict(byte[] vb, GridCacheVersion evictVer, GridCacheVersion obsoleteVer) + throws IgniteCheckedException, GridCacheEntryRemovedException { + assert cctx.swap().swapEnabled() && cctx.swap().offHeapEnabled() : this; + + boolean obsolete; + + synchronized (this) { + checkObsolete(); + + if (hasReaders() || !isStartVersion()) + return false; + + GridCacheMvcc mvcc = mvccExtras(); + + if (mvcc != null && !mvcc.isEmpty(obsoleteVer)) + return false; + + if (cctx.swap().removeOffheap(key, partition(), evictVer)) { + assert !hasValueUnlocked() : this; + + obsolete = markObsolete0(obsoleteVer, false); + + assert obsolete : this; + + cctx.swap().writeToSwap(partition(), key, vb); + } + else + obsolete = false; + } + + if (obsolete) + onMarkedObsolete(); + + return obsolete; + } + + /** {@inheritDoc} */ @Override public CacheObject unswap() throws IgniteCheckedException, GridCacheEntryRemovedException { return unswap(true); } @@ -536,7 +573,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme log.debug("Value did not change, skip write swap entry: " + this); if (cctx.swap().offheapEvictionEnabled()) - cctx.swap().enableOffheapEviction(key()); + cctx.swap().enableOffheapEviction(key(), partition()); return; } @@ -3643,6 +3680,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme try { if (F.isEmptyOrNulls(filter)) { synchronized (this) { + if (obsoleteVersionExtras() != null) + return false; + CacheObject prev = saveValueForIndexUnlocked(); if (!hasReaders() && markObsolete0(obsoleteVer, false)) { @@ -3684,6 +3724,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme return false; synchronized (this) { + if (obsoleteVersionExtras() != null) + return false; + if (!v.equals(ver)) // Version has changed since entry passed the filter. Do it again. continue; http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java index 81490a7..b7c66d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java @@ -126,9 +126,9 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry { * @return Version. */ public static GridCacheVersion version(byte[] bytes) { - int off = VERSION_OFFSET; // Skip ttl, expire time. + long off = BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time. - boolean verEx = bytes[off++] != 0; + boolean verEx = UNSAFE.getByte(bytes, off++) != 0; return U.readVersion(bytes, off, verEx); } @@ -157,26 +157,6 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry { return new IgniteBiTuple<>(valBytes, type); } - /** - * @param bytes Entry bytes. - * @return Value bytes offset. - */ - public static int valueOffset(byte[] bytes) { - assert bytes.length > 40 : bytes.length; - - int off = VERSION_OFFSET; // Skip ttl, expire time. - - boolean verEx = bytes[off++] != 0; - - off += verEx ? VERSION_EX_SIZE : VERSION_SIZE; - - off += 5; // Byte array flag + array size. - - assert bytes.length >= off; - - return off; - } - /** {@inheritDoc} */ @Override public byte[] valueBytes() { if (valBytes != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index 7fd6013..53b0421 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.spi.swapspace.SwapKey; @@ -101,8 +102,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { private final ReferenceQueue<Iterator<Map.Entry>> itQ = new ReferenceQueue<>(); /** Soft iterator set. */ - private final Collection<GridWeakIterator<Map.Entry>> itSet = - new GridConcurrentHashSet<>(); + private final Collection<GridWeakIterator<Map.Entry>> itSet = new GridConcurrentHashSet<>(); + + /** {@code True} if offheap to swap eviction is possible. */ + private boolean offheapToSwapEvicts; + + /** Values to be evicted from offheap to swap. */ + private ThreadLocal<Collection<IgniteBiTuple<byte[], byte[]>>> offheapEvicts = new ThreadLocal<>(); /** * @param enabled Flag to indicate if swap is enabled. @@ -127,9 +133,58 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } /** + * + */ + public void unwindOffheapEvicts() { + if (!offheapToSwapEvicts) + return; + + Collection<IgniteBiTuple<byte[], byte[]>> evicts = offheapEvicts.get(); + + if (evicts != null) { + GridCacheVersion obsoleteVer = cctx.versions().next(); + + for (IgniteBiTuple<byte[], byte[]> t : evicts) { + try { + byte[] kb = t.get1(); + byte[] vb = t.get2(); + + GridCacheVersion evictVer = GridCacheSwapEntryImpl.version(vb); + + KeyCacheObject key = cctx.toCacheKeyObject(kb); + + while (true) { + GridCacheEntryEx entry = cctx.cache().entryEx(key); + + try { + if (entry.offheapSwapEvict(vb, evictVer, obsoleteVer)) + cctx.cache().removeEntry(entry); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + // Retry. + } + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal off-heap entry", e); + } + } + + offheapEvicts.set(null); + } + } + + /** First offheap eviction warning flag. */ + private volatile boolean firstEvictWarn; + + /** * Initializes off-heap space. */ private void initOffHeap() { + assert offheapEnabled; + // Register big data usage. long max = cctx.config().getOffHeapMaxMemory(); @@ -137,43 +192,69 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int parts = cctx.config().getAffinity().partitions(); - GridOffHeapEvictListener lsnr = !swapEnabled && !offheapEnabled ? null : new GridOffHeapEvictListener() { - private volatile boolean firstEvictWarn; + GridOffHeapEvictListener lsnr; - @Override public void onEvict(int part, int hash, byte[] kb, byte[] vb) { - try { - if (!firstEvictWarn) - warnFirstEvict(); + if (swapEnabled) { + offheapToSwapEvicts = true; - writeToSwap(part, cctx.toCacheKeyObject(kb), vb); + lsnr = new GridOffHeapEvictListener() { + @Override public void onEvict(int part, int hash, byte[] kb, byte[] vb) { + assert offheapToSwapEvicts; - if (cctx.config().isStatisticsEnabled()) - cctx.cache().metrics0().onOffHeapEvict(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to unmarshal off-heap entry [part=" + part + ", hash=" + hash + ']', e); - } - } + onOffheapEvict(); + + Collection<IgniteBiTuple<byte[], byte[]>> evicts = offheapEvicts.get(); - private void warnFirstEvict() { - synchronized (this) { - if (firstEvictWarn) - return; + if (evicts == null) + offheapEvicts.set(evicts = new ArrayList<>()); - firstEvictWarn = true; + evicts.add(new IgniteBiTuple<>(kb, vb)); } - U.warn(log, "Off-heap evictions started. You may wish to increase 'offHeapMaxMemory' in " + - "cache configuration [cache=" + cctx.name() + - ", offHeapMaxMemory=" + cctx.config().getOffHeapMaxMemory() + ']', - "Off-heap evictions started: " + cctx.name()); - } - }; + @Override public boolean removedEvicted() { + return false; + } + }; + } + else { + lsnr = new GridOffHeapEvictListener() { + @Override public void onEvict(int part, int hash, byte[] kb, byte[] vb) { + onOffheapEvict(); + } + + @Override public boolean removedEvicted() { + return true; + } + }; + } offheap.create(spaceName, parts, init, max, lsnr); } /** + * Warns on first evict from off-heap. + */ + private void onOffheapEvict() { + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapEvict(); + + if (!firstEvictWarn) + return; + + synchronized (this) { + if (firstEvictWarn) + return; + + firstEvictWarn = true; + } + + U.warn(log, "Off-heap evictions started. You may wish to increase 'offHeapMaxMemory' in " + + "cache configuration [cache=" + cctx.name() + + ", offHeapMaxMemory=" + cctx.config().getOffHeapMaxMemory() + ']', + "Off-heap evictions started: " + cctx.name()); + } + + /** * @return {@code True} if swap store is enabled. */ public boolean swapEnabled() { @@ -966,6 +1047,35 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } /** + * @param key Key to remove. + * @param part Partition. + * @param ver Expected version. + * @return {@code True} if removed. + * @throws IgniteCheckedException If failed. + */ + boolean removeOffheap(final KeyCacheObject key, int part, final GridCacheVersion ver) + throws IgniteCheckedException { + assert offheapEnabled; + + checkIteratorQueue(); + + boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()), + new IgniteBiPredicate<Long, Integer>() { + @Override public boolean apply(Long ptr, Integer len) { + GridCacheVersion ver0 = GridCacheOffheapSwapEntry.version(ptr); + + return ver.equals(ver0); + } + } + ); + + if (rmv && cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapRemove(); + + return rmv; + } + + /** * @return {@code True} if offheap eviction is enabled. */ boolean offheapEvictionEnabled() { @@ -976,16 +1086,15 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * Enables eviction for offheap entry after {@link #readOffheapPointer} was called. * * @param key Key. + * @param part Partition. * @throws IgniteCheckedException If failed. */ - void enableOffheapEviction(final KeyCacheObject key) throws IgniteCheckedException { + void enableOffheapEviction(final KeyCacheObject key, int part) throws IgniteCheckedException { if (!offheapEnabled) return; checkIteratorQueue(); - int part = cctx.affinity().partition(key); - offheap.enableEviction(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); } @@ -1224,7 +1333,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * @param entry Entry bytes. * @throws IgniteCheckedException If failed. */ - private void writeToSwap(int part, KeyCacheObject key, byte[] entry) throws IgniteCheckedException { + public void writeToSwap(int part, KeyCacheObject key, byte[] entry) throws IgniteCheckedException { + assert swapEnabled; + checkIteratorQueue(); swapMgr.write(spaceName, @@ -1244,7 +1355,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { */ public void clearOffHeap() { if (offheapEnabled) - initOffHeap(); + clearOffHeap(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 980971c..919bce6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1029,9 +1029,14 @@ public class GridCacheUtils { ctx.evicts().unwind(); - if (ctx.isNear()) + ctx.swap().unwindOffheapEvicts(); + + if (ctx.isNear()) { ctx.near().dht().context().evicts().unwind(); + ctx.swap().unwindOffheapEvicts(); + } + ctx.ttl().expire(); } @@ -1041,14 +1046,8 @@ public class GridCacheUtils { public static <K, V> void unwindEvicts(GridCacheSharedContext<K, V> ctx) { assert ctx != null; - for (GridCacheContext<K, V> cacheCtx : ctx.cacheContexts()) { - cacheCtx.evicts().unwind(); - - if (cacheCtx.isNear()) - cacheCtx.near().dht().context().evicts().unwind(); - - cacheCtx.ttl().expire(); - } + for (GridCacheContext<K, V> cacheCtx : ctx.cacheContexts()) + unwindEvicts(cacheCtx); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 215a1b5..3c74055 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -261,8 +261,11 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, map.put(entry.key(), entry); - if (!entry.isInternal()) + if (!entry.isInternal()) { + assert !entry.deleted() : entry; + mapPubSize.increment(); + } } /** @@ -270,7 +273,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, */ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") void onRemoved(GridDhtCacheEntry entry) { - assert entry.obsolete(); + assert entry.obsolete() : entry; // Make sure to remove exactly this entry. synchronized (entry) { http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java index 024ea7c..492fa07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.util.offheap.GridOffHeapPartitionedMap; import org.apache.ignite.internal.util.typedef.CX2; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.marshaller.Marshaller; import org.jetbrains.annotations.Nullable; @@ -261,13 +262,35 @@ public class GridOffHeapProcessor extends GridProcessorAdapter { * @return {@code true} If succeeded. * @throws IgniteCheckedException If failed. */ - public boolean removex(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes) throws IgniteCheckedException { + public boolean removex(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes) + throws IgniteCheckedException { GridOffHeapPartitionedMap m = offheap(spaceName); return m != null && m.removex(part, U.hash(key), keyBytes(key, keyBytes)); } /** + * Removes value from offheap space for the given key. + * + * @param spaceName Space name. + * @param part Partition. + * @param key Key. + * @param keyBytes Key bytes. + * @param p Value predicate (arguments are value address and value length). + * @return {@code true} If succeeded. + * @throws IgniteCheckedException If failed. + */ + public boolean removex(@Nullable String spaceName, + int part, + KeyCacheObject key, + byte[] keyBytes, + IgniteBiPredicate<Long, Integer> p) throws IgniteCheckedException { + GridOffHeapPartitionedMap m = offheap(spaceName); + + return m != null && m.removex(part, U.hash(key), keyBytes(key, keyBytes), p); + } + + /** * Gets iterator over contents of the given space. * * @param spaceName Space name. http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapEvictListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapEvictListener.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapEvictListener.java index 4597be8..1219ae1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapEvictListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapEvictListener.java @@ -30,4 +30,9 @@ public interface GridOffHeapEvictListener { * @param valBytes Value bytes. */ public void onEvict(int part, int hash, byte[] keyBytes, byte[] valBytes); + + /** + * @return {@code True} if entry selected for eviction should be immediately removed. + */ + public boolean removedEvicted(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMap.java index 1fcddd7..d14a582 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMap.java @@ -20,13 +20,14 @@ package org.apache.ignite.internal.util.offheap; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.typedef.CX2; import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import org.jetbrains.annotations.Nullable; /** * Off-heap map. */ -public interface GridOffHeapMap<K> { +public interface GridOffHeapMap { /** * Gets partition this map belongs to. * @@ -102,6 +103,16 @@ public interface GridOffHeapMap<K> { public boolean removex(int hash, byte[] keyBytes); /** + * Removes value from off-heap map without returning it. + * + * @param hash Hash. + * @param keyBytes Key bytes. + * @param p Value predicate (arguments are value address and value length). + * @return {@code True} if value was removed. + */ + public boolean removex(int hash, byte[] keyBytes, IgniteBiPredicate<Long, Integer> p); + + /** * Puts key and value bytes into the map potentially replacing * existing entry. * http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapFactory.java index 1a3d219..4dd911f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapFactory.java @@ -32,8 +32,8 @@ public class GridOffHeapMapFactory { * @param initCap Initial capacity. * @return Off-heap map. */ - public static <K> GridOffHeapMap<K> unsafeMap(long initCap) { - return new GridUnsafeMap<>(128, 0.75f, initCap, 0, (short)0, null); + public static GridOffHeapMap unsafeMap(long initCap) { + return new GridUnsafeMap(128, 0.75f, initCap, 0, (short)0, null); } /** @@ -43,8 +43,8 @@ public class GridOffHeapMapFactory { * @param initCap Initial capacity. * @return Off-heap map. */ - public static <K> GridOffHeapMap<K> unsafeMap(int concurrency, long initCap) { - return new GridUnsafeMap<>(concurrency, 0.75f, initCap, 0, (short)0, null); + public static GridOffHeapMap unsafeMap(int concurrency, long initCap) { + return new GridUnsafeMap(concurrency, 0.75f, initCap, 0, (short)0, null); } /** @@ -55,8 +55,8 @@ public class GridOffHeapMapFactory { * @param initCap Initial capacity. * @return Off-heap map. */ - public static <K> GridOffHeapMap<K> unsafeMap(int concurrency, float load, long initCap) { - return new GridUnsafeMap<>(concurrency, load, initCap, 0, (short)0, null); + public static GridOffHeapMap unsafeMap(int concurrency, float load, long initCap) { + return new GridUnsafeMap(concurrency, load, initCap, 0, (short)0, null); } /** @@ -68,8 +68,8 @@ public class GridOffHeapMapFactory { * @param lruStripes Number of LRU stripes. * @return Off-heap map. */ - public static <K> GridOffHeapMap<K> unsafeMap(long initCap, long totalMem, short lruStripes) { - return new GridUnsafeMap<>(128, 0.75f, initCap, totalMem, lruStripes, null); + public static GridOffHeapMap unsafeMap(long initCap, long totalMem, short lruStripes) { + return new GridUnsafeMap(128, 0.75f, initCap, totalMem, lruStripes, null); } /** @@ -82,9 +82,9 @@ public class GridOffHeapMapFactory { * @param lsnr Optional eviction listener which gets notified every time an entry is evicted. * @return Off-heap map. */ - public static <K> GridOffHeapMap<K> unsafeMap(long initCap, long totalMem, short lruStripes, + public static GridOffHeapMap unsafeMap(long initCap, long totalMem, short lruStripes, @Nullable GridOffHeapEvictListener lsnr) { - return new GridUnsafeMap<>(128, 0.75f, initCap, totalMem, lruStripes, lsnr); + return new GridUnsafeMap(128, 0.75f, initCap, totalMem, lruStripes, lsnr); } /** @@ -98,9 +98,9 @@ public class GridOffHeapMapFactory { * @param lsnr Optional eviction listener which gets notified every time an entry is evicted. * @return Off-heap map. */ - public static <K> GridOffHeapMap<K> unsafeMap(int concurrency, long initCap, long totalMem, short lruStripes, + public static GridOffHeapMap unsafeMap(int concurrency, long initCap, long totalMem, short lruStripes, @Nullable GridOffHeapEvictListener lsnr) { - return new GridUnsafeMap<>(concurrency, 0.75f, initCap, totalMem, lruStripes, lsnr); + return new GridUnsafeMap(concurrency, 0.75f, initCap, totalMem, lruStripes, lsnr); } /** @@ -115,9 +115,9 @@ public class GridOffHeapMapFactory { * @param lsnr Optional eviction listener which gets notified every time an entry is evicted. * @return Off-heap map. */ - public static <K> GridOffHeapMap<K> unsafeMap(int concurrency, float load, long initCap, long totalMem, + public static <K> GridOffHeapMap unsafeMap(int concurrency, float load, long initCap, long totalMem, short lruStripes, @Nullable GridOffHeapEvictListener lsnr) { - return new GridUnsafeMap<>(concurrency, load, initCap, totalMem, lruStripes, lsnr); + return new GridUnsafeMap(concurrency, load, initCap, totalMem, lruStripes, lsnr); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java index 3afdfa9..5e03677 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java @@ -21,6 +21,7 @@ import java.util.Set; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.typedef.CX2; import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import org.jetbrains.annotations.Nullable; @@ -111,6 +112,16 @@ public interface GridOffHeapPartitionedMap { public boolean removex(int p, int hash, byte[] keyBytes); /** + * Removes value from off-heap map without returning it. + * @param part Partition. + * @param hash Hash. + * @param keyBytes Key bytes. + * @param p Value predicate (arguments are value address and value length). + * @return {@code True} if value was removed. + */ + public boolean removex(int part, int hash, byte[] keyBytes, IgniteBiPredicate<Long, Integer> p); + + /** * Puts key and value bytes into the map potentially replacing * existing entry. * http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMap.java index 40fb3e8..ed13fe1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMap.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.util.typedef.CX2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import org.jetbrains.annotations.Nullable; import org.jsr166.LongAdder8; @@ -42,7 +43,7 @@ import static org.apache.ignite.internal.util.offheap.GridOffHeapEvent.REHASH; /** * Off-heap map based on {@code Unsafe} implementation. */ -public class GridUnsafeMap<K> implements GridOffHeapMap<K> { +public class GridUnsafeMap implements GridOffHeapMap { /** Header size. */ private static final int HEADER = 4 /*hash*/ + 4 /*key-size*/ + 4 /*value-size*/ + 8 /*queue-address*/ + 8 /*next-address*/; @@ -77,7 +78,7 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> { private final float load; /** Segments. */ - private final Segment<K>[] segs; + private final Segment[] segs; /** Total memory. */ private final GridUnsafeMemory mem; @@ -111,6 +112,9 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> { /** LRU poller. */ private final GridUnsafeLruPoller lruPoller; + /** */ + private final boolean rmvEvicted; + /** * @param concurrency Concurrency. * @param load Load factor. @@ -180,6 +184,8 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> { } } }; + + rmvEvicted = evictLsnr == null || evictLsnr.removedEvicted(); } /** @@ -225,6 +231,8 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> { segs = new Segment[size]; init(initCap, size); + + rmvEvicted = evictLsnr == null || evictLsnr.removedEvicted(); } /** @@ -247,7 +255,7 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> { for (int i = 0; i < size; i++) { try { - segs[i] = new Segment<>(i, cap); + segs[i] = new Segment(i, cap); } catch (GridOffHeapOutOfMemoryException e) { destruct(); @@ -327,6 +335,11 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> { } /** {@inheritDoc} */ + @Override public boolean removex(int hash, byte[] keyBytes, IgniteBiPredicate<Long, Integer> p) { + return segmentFor(hash).removex(hash, keyBytes, p); + } + + /** {@inheritDoc} */ @Override public boolean put(int hash, byte[] keyBytes, byte[] valBytes) { return segmentFor(hash).put(hash, keyBytes, valBytes); } @@ -559,7 +572,7 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> { /** * Segment. */ - private class Segment<K> { + private class Segment { /** Lock. */ private final ReadWriteLock lock = new ReentrantReadWriteLock(); @@ -1009,41 +1022,44 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> { } if (cur != 0) { - long next = Entry.nextAddress(cur, mem); - - if (prev != 0) - Entry.nextAddress(prev, next, mem); // Relink. - else { - if (next == 0) - Bin.clear(binAddr, mem); - else - Bin.first(binAddr, next, mem); - } + long a; - if (evictLsnr != null) { - keyBytes = Entry.readKeyBytes(cur, mem); + assert qAddr == (a = Entry.queueAddress(cur, mem)) : "Queue node address mismatch " + + "[qAddr=" + qAddr + ", entryQueueAddr=" + a + ']'; - // TODO: GG-8123: Inlined as a workaround. Revert when 7u60 is released. -// valBytes = Entry.readValueBytes(cur, mem); - { - int keyLen = Entry.readKeyLength(cur, mem); - int valLen = Entry.readValueLength(cur, mem); + if (rmvEvicted) { + long next = Entry.nextAddress(cur, mem); - valBytes = mem.readBytes(cur + HEADER + keyLen, valLen); + if (prev != 0) + Entry.nextAddress(prev, next, mem); // Relink. + else { + if (next == 0) + Bin.clear(binAddr, mem); + else + Bin.first(binAddr, next, mem); } - } - long a; + relSize = Entry.size(cur, mem); + relAddr = cur; - assert qAddr == (a = Entry.queueAddress(cur, mem)) : "Queue node address mismatch " + - "[qAddr=" + qAddr + ", entryQueueAddr=" + a + ']'; + cnt--; - relSize = Entry.size(cur, mem); - relAddr = cur; + totalCnt.decrement(); + } + else { + if (qAddr != 0) { + boolean clear = Entry.clearQueueAddress(cur, qAddr, mem); - cnt--; + assert clear; + } - totalCnt.decrement(); + keyBytes = Entry.readKeyBytes(cur, mem); + + int keyLen = Entry.readKeyLength(cur, mem); + int valLen = Entry.readValueLength(cur, mem); + + valBytes = mem.readBytes(cur + HEADER + keyLen, valLen); + } } } } @@ -1251,7 +1267,7 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> { */ @SuppressWarnings("TooBroadScope") byte[] remove(int hash, byte[] keyBytes) { - return remove(hash, keyBytes, true); + return remove(hash, keyBytes, true, null); } /** @@ -1260,17 +1276,28 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> { * @return {@code True} if value was removed. */ boolean removex(int hash, byte[] keyBytes) { - return remove(hash, keyBytes, false) == EMPTY_BYTES; + return remove(hash, keyBytes, false, null) == EMPTY_BYTES; + } + + /** + * @param hash Hash. + * @param keyBytes Key bytes. + * @param p Value predicate. + * @return {@code True} if value was removed. + */ + boolean removex(int hash, byte[] keyBytes, IgniteBiPredicate<Long, Integer> p) { + return remove(hash, keyBytes, false, p) == EMPTY_BYTES; } /** * @param hash Hash. * @param keyBytes Key bytes. * @param retval {@code True} if need removed value. + * @param p Value predicate. * @return Removed value bytes. */ @SuppressWarnings("TooBroadScope") - byte[] remove(int hash, byte[] keyBytes, boolean retval) { + byte[] remove(int hash, byte[] keyBytes, boolean retval, @Nullable IgniteBiPredicate<Long, Integer> p) { int relSize = 0; long relAddr = 0; long qAddr = 0; @@ -1291,6 +1318,19 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> { // If found match. if (Entry.keyEquals(cur, keyBytes, mem)) { + int keyLen = 0; + int valLen = 0; + + if (p != null) { + keyLen = Entry.readKeyLength(cur, mem); + valLen = Entry.readValueLength(cur, mem); + + long valPtr = cur + HEADER + keyLen; + + if (!p.apply(valPtr, valLen)) + return null; + } + if (prev != 0) Entry.nextAddress(prev, next, mem); // Relink. else { @@ -1300,18 +1340,16 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> { Bin.first(binAddr, next, mem); } - // TODO: GG-8123: Inlined as a workaround. Revert when 7u60 is released. -// valBytes = retval ? Entry.readValueBytes(cur, mem) : EMPTY_BYTES; - { - if (retval) { - int keyLen = Entry.readKeyLength(cur, mem); - int valLen = Entry.readValueLength(cur, mem); - - valBytes = mem.readBytes(cur + HEADER + keyLen, valLen); + if (retval) { + if (keyLen == 0) { + keyLen = Entry.readKeyLength(cur, mem); + valLen = Entry.readValueLength(cur, mem); } - else - valBytes = EMPTY_BYTES; + + valBytes = mem.readBytes(cur + HEADER + keyLen, valLen); } + else + valBytes = EMPTY_BYTES; // Prepare release of memory. qAddr = Entry.queueAddress(cur, mem); @@ -1382,8 +1420,7 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> { * @param keyBytes Key bytes. * @return Value pointer. */ - @Nullable - IgniteBiTuple<Long, Integer> valuePointer(int hash, byte[] keyBytes) { + @Nullable IgniteBiTuple<Long, Integer> valuePointer(int hash, byte[] keyBytes) { long binAddr = readLock(hash); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java index 070da51..fb8ac14 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.util.offheap.GridOffHeapMap; import org.apache.ignite.internal.util.offheap.GridOffHeapPartitionedMap; import org.apache.ignite.internal.util.typedef.CX2; import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import org.jetbrains.annotations.Nullable; import org.jsr166.LongAdder8; @@ -198,6 +199,14 @@ public class GridUnsafePartitionedMap implements GridOffHeapPartitionedMap { } /** {@inheritDoc} */ + @Override public boolean removex(int part, + int hash, + byte[] keyBytes, + IgniteBiPredicate<Long, Integer> p) { + return mapFor(part).removex(hash, keyBytes, p); + } + + /** {@inheritDoc} */ @Override public boolean put(int p, int hash, byte[] keyBytes, byte[] valBytes) { return mapFor(p).put(hash, keyBytes, valBytes); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTest.java index 271d8b1..214beb6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTest.java @@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -53,6 +54,12 @@ public class CacheSwapUnswapGetTest extends GridCommonAbstractTest { /** */ private static final long DURATION = 30_000; + /** */ + private static final long OFFHEAP_MEM = 1000; + + /** */ + private static final int MAX_HEAP_SIZE = 100; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -81,7 +88,7 @@ public class CacheSwapUnswapGetTest extends GridCommonAbstractTest { if (memMode == CacheMemoryMode.ONHEAP_TIERED) { LruEvictionPolicy plc = new LruEvictionPolicy(); - plc.setMaxSize(100); + plc.setMaxSize(MAX_HEAP_SIZE); ccfg.setEvictionPolicy(plc); } @@ -89,7 +96,7 @@ public class CacheSwapUnswapGetTest extends GridCommonAbstractTest { if (swap) { ccfg.setSwapEnabled(true); - ccfg.setOffHeapMaxMemory(1000); + ccfg.setOffHeapMaxMemory(OFFHEAP_MEM); } else ccfg.setOffHeapMaxMemory(0); @@ -133,6 +140,20 @@ public class CacheSwapUnswapGetTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testTxCacheOffheapSwapEvict() throws Exception { + swapUnswap(TRANSACTIONAL, CacheMemoryMode.ONHEAP_TIERED, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxCacheOffheapTieredSwapEvict() throws Exception { + swapUnswap(TRANSACTIONAL, CacheMemoryMode.OFFHEAP_TIERED, true); + } + + /** + * @throws Exception If failed. + */ public void testAtomicCacheOffheapEvict() throws Exception { swapUnswap(ATOMIC, CacheMemoryMode.ONHEAP_TIERED, false); } @@ -145,6 +166,20 @@ public class CacheSwapUnswapGetTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testAtomicCacheOffheapSwapEvict() throws Exception { + swapUnswap(ATOMIC, CacheMemoryMode.ONHEAP_TIERED, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicCacheOffheapTieredSwapEvict() throws Exception { + swapUnswap(ATOMIC, CacheMemoryMode.OFFHEAP_TIERED, true); + } + + /** * @param atomicityMode Cache atomicity mode. * @param memMode Cache memory mode. * @param swap {@code True} if swap enabled. @@ -220,12 +255,56 @@ public class CacheSwapUnswapGetTest extends GridCommonAbstractTest { } }); - Thread.sleep(DURATION); + long endTime = System.currentTimeMillis() + DURATION; + + while (System.currentTimeMillis() < endTime) { + Thread.sleep(5000); + + log.info("Cache size [heap=" + cache.localSize(CachePeekMode.ONHEAP) + + ", offheap=" + cache.localSize(CachePeekMode.OFFHEAP) + + ", swap=" + cache.localSize(CachePeekMode.SWAP) + + ", total=" + cache.localSize() + + ", offheapMem=" + cache.metrics().getOffHeapAllocatedSize() + ']'); + } done.set(true); fut.get(); getFut.get(); + + for (Integer key : keys) { + String val = cache.get(key); + + assertNotNull(val); + } + + int onheapSize = cache.localSize(CachePeekMode.ONHEAP); + int offheapSize = cache.localSize(CachePeekMode.OFFHEAP); + int swapSize = cache.localSize(CachePeekMode.SWAP); + int total = cache.localSize(); + long offheapMem = cache.metrics().getOffHeapAllocatedSize(); + + log.info("Cache size [heap=" + onheapSize + + ", offheap=" + offheapSize + + ", swap=" + swapSize + + ", total=" + total + + ", offheapMem=" + offheapMem + ']'); + + assertTrue(total > 0); + + assertEquals(onheapSize + offheapSize + swapSize, total); + + if (memMode == CacheMemoryMode.OFFHEAP_TIERED) + assertEquals(0, onheapSize); + else + assertEquals(MAX_HEAP_SIZE, onheapSize); + + if (swap) { + assertTrue(swapSize > 0); + assertTrue(offheapMem <= OFFHEAP_MEM); + } + else + assertEquals(0, swapSize); } finally { done.set(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index d9510e6..1fef4d5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -803,6 +803,12 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr } /** {@inheritDoc} */ + @Override public boolean offheapSwapEvict(byte[] vb, GridCacheVersion evictVer, GridCacheVersion obsoleteVer) + throws IgniteCheckedException, GridCacheEntryRemovedException { + return false; + } + + /** {@inheritDoc} */ @Override public CacheObject unswap(boolean needVal) throws IgniteCheckedException { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapAbstractSelfTest.java index 4bda4f4..d3241f2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapAbstractSelfTest.java @@ -42,7 +42,7 @@ public abstract class GridOffHeapMapAbstractSelfTest extends GridCommonAbstractT private static final Random RAND = new Random(); /** Unsafe map. */ - private GridOffHeapMap<String> map; + private GridOffHeapMap map; /** */ protected float load = 0.75f; @@ -86,7 +86,7 @@ public abstract class GridOffHeapMapAbstractSelfTest extends GridCommonAbstractT /** * @return New map. */ - protected abstract <K> GridOffHeapMap<K> newMap(); + protected abstract GridOffHeapMap newMap(); /** * @@ -551,6 +551,10 @@ public abstract class GridOffHeapMapAbstractSelfTest extends GridCommonAbstractT evictCnt.incrementAndGet(); } + + @Override public boolean removedEvicted() { + return true; + } }; map = newMap(); @@ -587,6 +591,10 @@ public abstract class GridOffHeapMapAbstractSelfTest extends GridCommonAbstractT @Override public void onEvict(int part, int hash, byte[] k, byte[] v) { evictCnt.incrementAndGet(); } + + @Override public boolean removedEvicted() { + return true; + } }; map = newMap(); @@ -622,6 +630,10 @@ public abstract class GridOffHeapMapAbstractSelfTest extends GridCommonAbstractT @Override public void onEvict(int part, int hash, byte[] k, byte[] v) { evictCnt.incrementAndGet(); } + + @Override public boolean removedEvicted() { + return true; + } }; map = newMap(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapPerformanceAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapPerformanceAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapPerformanceAbstractTest.java index d1a1b20..f7388e8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapPerformanceAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapPerformanceAbstractTest.java @@ -39,7 +39,7 @@ public abstract class GridOffHeapMapPerformanceAbstractTest extends GridCommonAb new HashMap<>(LOAD_CNT); /** Unsafe map. */ - private GridOffHeapMap<String> map; + private GridOffHeapMap map; /** */ protected float load = 0.75f; @@ -91,7 +91,7 @@ public abstract class GridOffHeapMapPerformanceAbstractTest extends GridCommonAb /** * @return New map. */ - protected abstract <K> GridOffHeapMap<K> newMap(); + protected abstract GridOffHeapMap newMap(); /** * @param key Key. http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMapAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMapAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMapAbstractSelfTest.java index 03fcd4a..032c98f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMapAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMapAbstractSelfTest.java @@ -882,6 +882,10 @@ public abstract class GridOffHeapPartitionedMapAbstractSelfTest extends GridComm evictCnt.incrementAndGet(); } + + @Override public boolean removedEvicted() { + return true; + } }; map = newMap(); @@ -921,6 +925,10 @@ public abstract class GridOffHeapPartitionedMapAbstractSelfTest extends GridComm @Override public void onEvict(int part, int hash, byte[] k, byte[] v) { evictCnt.incrementAndGet(); } + + @Override public boolean removedEvicted() { + return true; + } }; map = newMap(); @@ -957,6 +965,10 @@ public abstract class GridOffHeapPartitionedMapAbstractSelfTest extends GridComm @Override public void onEvict(int part, int hash, byte[] k, byte[] v) { evictCnt.incrementAndGet(); } + + @Override public boolean removedEvicted() { + return true; + } }; map = newMap(); @@ -1009,6 +1021,10 @@ public abstract class GridOffHeapPartitionedMapAbstractSelfTest extends GridComm evicted.set(key); } + + @Override public boolean removedEvicted() { + return true; + } }; map = newMap(); @@ -1072,6 +1088,10 @@ public abstract class GridOffHeapPartitionedMapAbstractSelfTest extends GridComm evicted.set(key); } + + @Override public boolean removedEvicted() { + return true; + } }; map = newMap(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMapPerformanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMapPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMapPerformanceTest.java index e758246..58ad494 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMapPerformanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMapPerformanceTest.java @@ -26,7 +26,7 @@ import org.apache.ignite.internal.util.offheap.GridOffHeapMapPerformanceAbstract */ public class GridUnsafeMapPerformanceTest extends GridOffHeapMapPerformanceAbstractTest { /** {@inheritDoc} */ - @Override protected <K> GridOffHeapMap<K> newMap() { + @Override protected GridOffHeapMap newMap() { return GridOffHeapMapFactory.unsafeMap(concurrency, load, initCap, mem, lruStripes, evictClo); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMapSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMapSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMapSelfTest.java index 43fdb34..0e36f3d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMapSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMapSelfTest.java @@ -26,7 +26,7 @@ import org.apache.ignite.internal.util.offheap.GridOffHeapMapFactory; */ public class GridUnsafeMapSelfTest extends GridOffHeapMapAbstractSelfTest { /** {@inheritDoc} */ - @Override protected <K> GridOffHeapMap<K> newMap() { + @Override protected GridOffHeapMap newMap() { return GridOffHeapMapFactory.unsafeMap(concurrency, load, initCap, mem, lruStripes, evictLsnr); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/GridOffHeapMapPerformanceAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/GridOffHeapMapPerformanceAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/GridOffHeapMapPerformanceAbstractTest.java index ed37306..4064482 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/GridOffHeapMapPerformanceAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/GridOffHeapMapPerformanceAbstractTest.java @@ -42,7 +42,7 @@ public abstract class GridOffHeapMapPerformanceAbstractTest extends GridCommonAb new HashMap<>(LOAD_CNT); /** Unsafe map. */ - private GridOffHeapMap<String> map; + private GridOffHeapMap map; /** */ protected float load = 0.75f; @@ -94,7 +94,7 @@ public abstract class GridOffHeapMapPerformanceAbstractTest extends GridCommonAb /** * @return New map. */ - protected abstract <K> GridOffHeapMap<K> newMap(); + protected abstract GridOffHeapMap newMap(); /** * @param key Key. http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/unsafe/GridUnsafeMapPerformanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/unsafe/GridUnsafeMapPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/unsafe/GridUnsafeMapPerformanceTest.java index 1486a9c..af691b4 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/unsafe/GridUnsafeMapPerformanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/offheap/unsafe/GridUnsafeMapPerformanceTest.java @@ -26,7 +26,7 @@ import org.apache.ignite.internal.util.offheap.GridOffHeapMapPerformanceAbstract */ public class GridUnsafeMapPerformanceTest extends GridOffHeapMapPerformanceAbstractTest { /** {@inheritDoc} */ - @Override protected <K> GridOffHeapMap<K> newMap() { + @Override protected GridOffHeapMap newMap() { return GridOffHeapMapFactory.unsafeMap(concurrency, load, initCap, mem, lruStripes, evictClo); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java index b02b37e..be644e2 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java @@ -32,7 +32,6 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CachePeekMode; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy; import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.cache.query.SqlQuery; @@ -55,6 +54,7 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; /** * Multi-threaded tests for cache queries. @@ -111,7 +111,7 @@ public class IgniteCacheQueryMultiThreadedSelfTest extends GridCommonAbstractTes cacheCfg.setCacheMode(PARTITIONED); cacheCfg.setAtomicityMode(TRANSACTIONAL); - cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); cacheCfg.setSwapEnabled(true); cacheCfg.setBackups(1); @@ -139,6 +139,11 @@ public class IgniteCacheQueryMultiThreadedSelfTest extends GridCommonAbstractTes return cacheCfg; } + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return DURATION + 60_000; + } + /** * */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b9222164/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest.java index 909fd74..d7d2b5a 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest.java @@ -22,11 +22,6 @@ package org.apache.ignite.internal.processors.cache; */ public class IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest extends IgniteCacheQueryOffheapMultiThreadedSelfTest { /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - fail("IGNITE-959"); - } - - /** {@inheritDoc} */ @Override protected boolean evictsEnabled() { return true; }
