ignite-2615 If swap is not enabled need pass value evicted from offheap to query manager ignite-2622 Do not keep custom messages data after message is processed ignite-2586 Fixed GridCacheMapEntry.clear
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a6b3dedd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a6b3dedd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a6b3dedd Branch: refs/heads/ignite-2407 Commit: a6b3dedd381a71fec8dd9dcf944be2626336cc22 Parents: 3dce33f Author: sboikov <[email protected]> Authored: Fri Feb 12 16:43:01 2016 +0300 Committer: sboikov <[email protected]> Committed: Fri Feb 12 16:43:01 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 4 +- .../cache/GridCacheClearAllRunnable.java | 2 +- .../processors/cache/GridCacheEntryEx.java | 11 +- .../processors/cache/GridCacheMapEntry.java | 102 ++-- .../processors/cache/GridCacheProcessor.java | 4 +- .../processors/cache/GridCacheSwapManager.java | 43 +- .../dht/GridDhtPartitionTopologyImpl.java | 7 +- .../GridDhtPartitionsExchangeFuture.java | 6 + .../ignite/spi/discovery/tcp/ServerImpl.java | 5 + .../TcpDiscoveryCustomEventMessage.java | 10 +- .../cache/CacheConfigurationLeakTest.java | 62 +++ .../processors/cache/GridCacheTestEntryEx.java | 5 +- .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 2 +- .../testsuites/IgniteCacheTestSuite2.java | 2 + .../CacheQueryOffheapEvictDataLostTest.java | 138 +++++ .../CacheRandomOperationsMultithreadedTest.java | 507 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite.java | 3 + .../IgniteCacheWithIndexingTestSuite.java | 2 + 18 files changed, 812 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a6b3dedd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 3fac207..535bc9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1250,7 +1250,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V try { if (e != null) - e.clear(obsoleteVer, readers, null); + e.clear(obsoleteVer, readers); } catch (IgniteCheckedException ex) { U.error(log, "Failed to clearLocally entry (will continue to clearLocally other entries): " + e, @@ -4609,7 +4609,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V GridCacheEntryEx entry = ctx.isSwapOrOffheapEnabled() ? entryEx(cacheKey) : peekEx(cacheKey); if (entry != null) - return entry.clear(obsoleteVer, readers, null); + return entry.clear(obsoleteVer, readers); } catch (GridDhtInvalidPartitionException ignored) { // No-op. http://git-wip-us.apache.org/repos/asf/ignite/blob/a6b3dedd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java index 77c5a55..ad8ade1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java @@ -143,7 +143,7 @@ public class GridCacheClearAllRunnable<K, V> implements Runnable { */ protected void clearEntry(GridCacheEntryEx e) { try { - e.clear(obsoleteVer, readers, CU.empty0()); + e.clear(obsoleteVer, readers); } catch (IgniteCheckedException ex) { U.error(log, "Failed to clearLocally entry from cache (will continue to clearLocally other entries): " + e, ex); http://git-wip-us.apache.org/repos/asf/ignite/blob/a6b3dedd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index ccbaf38..9bee307 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -537,12 +537,10 @@ public interface GridCacheEntryEx { * * @param ver Obsolete version. * @param readers Flag to clear readers as well. - * @param filter Optional entry filter. * @throws IgniteCheckedException If failed to remove from swap. * @return {@code True} if entry was not being used, passed the filter and could be removed. */ - public boolean clear(GridCacheVersion ver, boolean readers, - @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException; + public boolean clear(GridCacheVersion ver, boolean readers) throws IgniteCheckedException; /** * This locks is called by transaction manager during prepare step @@ -922,7 +920,10 @@ public interface GridCacheEntryEx { public void updateTtl(@Nullable GridCacheVersion ver, long ttl); /** - * Tries to do offheap -> swap eviction. + * Called when entry should be evicted from offheap. + * <p> + * If swap is enabled tries to do offheap -> swap eviction, otherwise evicted value should + * be passed to query manager. * * @param entry Serialized swap entry. * @param evictVer Version when entry was selected for eviction. @@ -931,7 +932,7 @@ public interface GridCacheEntryEx { * @throws GridCacheEntryRemovedException If entry was removed. * @return {@code True} if entry was obsoleted and written to swap. */ - public boolean offheapSwapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer) + public boolean onOffheapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer) throws IgniteCheckedException, GridCacheEntryRemovedException; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a6b3dedd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 9336e0a..c1eeb5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -427,9 +427,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** {@inheritDoc} */ - @Override public boolean offheapSwapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer) + @Override public boolean onOffheapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer) throws IgniteCheckedException, GridCacheEntryRemovedException { - assert cctx.swap().swapEnabled() && cctx.swap().offHeapEnabled() : this; + assert cctx.swap().offHeapEnabled() && (cctx.swap().swapEnabled() || cctx.queries().enabled()) : this; boolean obsolete; @@ -444,12 +444,18 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (mvcc != null && !mvcc.isEmpty(obsoleteVer)) return false; - if (cctx.swap().offheapSwapEvict(key, entry, partition(), evictVer)) { + if (cctx.swap().onOffheapEvict(key, entry, partition(), evictVer)) { assert !hasValueUnlocked() : this; obsolete = markObsolete0(obsoleteVer, false, null); assert obsolete : this; + + if (!cctx.swap().swapEnabled()) { + CacheObject val = cctx.swap().unmarshalSwapEntryValue(entry); + + clearIndex(val); + } } else obsolete = false; @@ -2619,85 +2625,51 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** {@inheritDoc} */ - @Override public boolean clear(GridCacheVersion ver, boolean readers, - @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { - boolean ret; - boolean rmv; - boolean marked; - - while (true) { - ret = false; - rmv = false; - marked = false; - - // For optimistic check. - GridCacheVersion startVer = null; - - if (!F.isEmptyOrNulls(filter)) { - synchronized (this) { - startVer = this.ver; - } - - if (!cctx.isAll(this, filter)) - return false; - } - - synchronized (this) { - if (startVer != null && !startVer.equals(this.ver)) - // Version has changed since filter checking. - continue; - - CacheObject val = saveValueForIndexUnlocked(); - - try { - if ((!hasReaders() || readers)) { - // markObsolete will clear the value. - if (!(marked = markObsolete0(ver, true, null))) { - if (log.isDebugEnabled()) - log.debug("Entry could not be marked obsolete (it is still used): " + this); + @Override public boolean clear(GridCacheVersion ver, boolean readers) throws IgniteCheckedException { + synchronized (this) { + if (obsolete()) + return false; - break; - } + CacheObject val = saveValueForIndexUnlocked(); - clearReaders(); - } - else { + try { + if ((!hasReaders() || readers)) { + // markObsolete will clear the value. + if (!(markObsolete0(ver, true, null))) { if (log.isDebugEnabled()) - log.debug("Entry could not be marked obsolete (it still has readers): " + this); + log.debug("Entry could not be marked obsolete (it is still used): " + this); - break; + return false; } + + clearReaders(); } - catch (GridCacheEntryRemovedException ignore) { + else { if (log.isDebugEnabled()) - log.debug("Got removed entry when clearing (will simply return): " + this); - - ret = true; + log.debug("Entry could not be marked obsolete (it still has readers): " + this); - break; + return false; } + } + catch (GridCacheEntryRemovedException ignore) { + assert false; - if (log.isDebugEnabled()) - log.debug("Entry has been marked obsolete: " + this); - - clearIndex(val); + return false; + } - releaseSwap(); + if (log.isDebugEnabled()) + log.debug("Entry has been marked obsolete: " + this); - ret = true; - rmv = true; + clearIndex(val); - break; - } + releaseSwap(); } - if (marked) - onMarkedObsolete(); + onMarkedObsolete(); - if (rmv) - cctx.cache().removeEntry(this); // Clear cache. + cctx.cache().removeEntry(this); // Clear cache. - return ret; + return true; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a6b3dedd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 7a36e73..4bf96d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -2868,9 +2868,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { try { KeyCacheObject key = cctx.toCacheKeyObject(keyBytes); - GridCacheSwapEntry swapEntry = GridCacheSwapEntryImpl.unmarshal(valBytes, true); - - CacheObject val = swapEntry.value(); + CacheObject val = cctx.swap().unmarshalSwapEntryValue(valBytes); assert val != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/a6b3dedd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index cbf09bc..37c7958 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionAware; import org.apache.ignite.internal.processors.offheap.GridOffHeapProcessor; +import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridEmptyCloseableIterator; @@ -106,8 +107,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** Soft iterator set. */ private final Collection<GridWeakIterator<Map.Entry>> itSet = new GridConcurrentHashSet<>(); - /** {@code True} if offheap to swap eviction is possible. */ - private boolean offheapToSwapEvicts; + /** {@code True} if need process evictions from offheap. */ + private boolean unwindOffheapEvicts; /** Values to be evicted from offheap to swap. */ private ThreadLocal<Collection<IgniteBiTuple<byte[], byte[]>>> offheapEvicts = new ThreadLocal<>(); @@ -141,7 +142,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * */ public void unwindOffheapEvicts() { - if (!offheapToSwapEvicts) + if (!unwindOffheapEvicts) return; Collection<IgniteBiTuple<byte[], byte[]>> evicts = offheapEvicts.get(); @@ -162,7 +163,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { GridCacheEntryEx entry = cctx.cache().entryEx(key); try { - if (entry.offheapSwapEvict(vb, evictVer, obsoleteVer)) + if (entry.onOffheapEvict(vb, evictVer, obsoleteVer)) cctx.cache().removeEntry(entry); break; @@ -199,12 +200,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { GridOffHeapEvictListener lsnr; - if (swapEnabled) { - offheapToSwapEvicts = true; + if (swapEnabled || GridQueryProcessor.isEnabled(cctx.config())) { + unwindOffheapEvicts = true; lsnr = new GridOffHeapEvictListener() { @Override public void onEvict(int part, int hash, byte[] kb, byte[] vb) { - assert offheapToSwapEvicts; + assert unwindOffheapEvicts; onOffheapEvict(); @@ -1076,7 +1077,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * @return {@code True} if removed. * @throws IgniteCheckedException If failed. */ - boolean offheapSwapEvict(final KeyCacheObject key, byte[] entry, int part, final GridCacheVersion ver) + boolean onOffheapEvict(final KeyCacheObject key, byte[] entry, int part, final GridCacheVersion ver) throws IgniteCheckedException { assert offheapEnabled; @@ -1096,13 +1097,14 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { Collection<GridCacheSwapListener> lsnrs = offheapLsnrs.get(part); if (lsnrs != null) { - GridCacheSwapEntry e = swapEntry(GridCacheSwapEntryImpl.unmarshal(entry, false)); + GridCacheSwapEntry e = swapEntry(unmarshalSwapEntry(entry, false)); for (GridCacheSwapListener lsnr : lsnrs) lsnr.onEntryUnswapped(part, key, e); } - cctx.swap().writeToSwap(part, key, entry); + if (swapEnabled) + cctx.swap().writeToSwap(part, key, entry); } return rmv; @@ -2142,6 +2144,19 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** * @param bytes Bytes to unmarshal. + * @return Unmarshalled values. + * @throws IgniteCheckedException If failed. + */ + public CacheObject unmarshalSwapEntryValue(byte[] bytes) throws IgniteCheckedException { + GridCacheSwapEntry swapEntry = swapEntry(GridCacheSwapEntryImpl.unmarshal(bytes, true)); + + assert swapEntry != null && swapEntry.value() != null : swapEntry; + + return swapEntry.value(); + } + + /** + * @param bytes Bytes to unmarshal. * @param valOnly If {@code true} unmarshalls only value. * @return Unmarshalled entry. */ @@ -2192,9 +2207,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { @Override protected Map.Entry<byte[], GridCacheSwapEntry> onNext() throws IgniteCheckedException { Map.Entry<byte[], byte[]> e = iter.nextX(); - GridCacheSwapEntry unmarshalled = unmarshalSwapEntry(e.getValue(), false); + GridCacheSwapEntry unmarshalled = swapEntry(unmarshalSwapEntry(e.getValue(), false)); - return F.t(e.getKey(), swapEntry(unmarshalled)); + return F.t(e.getKey(), unmarshalled); } /** {@inheritDoc} */ @@ -2500,9 +2515,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** {@inheritDoc} */ @Override public V getValue() { try { - GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue(), false); + GridCacheSwapEntry e = swapEntry(unmarshalSwapEntry(entry.getValue(), false)); - swapEntry(e); + assert e != null; return e.value().value(cctx.cacheObjectContext(), false); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a6b3dedd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 0e579ac..bf2d2c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -427,6 +427,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (node2part != null && node2part.valid()) checkEvictions(updateSeq); + updateRebalanceVersion(); + consistencyCheck(); if (log.isDebugEnabled()) @@ -1365,11 +1367,14 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { */ private void updateRebalanceVersion() { if (!rebalancedTopVer.equals(topVer)) { + if (node2part == null || !node2part.valid()) + return; + for (int i = 0; i < cctx.affinity().partitions(); i++) { List<ClusterNode> affNodes = cctx.affinity().nodes(i, topVer); // Topology doesn't contain server nodes (just clients). - if (affNodes.isEmpty() || (node2part != null && !node2part.valid())) + if (affNodes.isEmpty()) continue; List<ClusterNode> owners = owners(i); http://git-wip-us.apache.org/repos/asf/ignite/blob/a6b3dedd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 22fb59e..68a05e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; @@ -1132,6 +1133,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT cacheCtx.config().getAffinity().removeNode(exchId.nodeId()); } + reqs = null; + + if (discoEvt instanceof DiscoveryCustomEvent) + ((DiscoveryCustomEvent)discoEvt).customMessage(null); + return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/a6b3dedd/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 0106b0a..fa0ae1c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -4597,6 +4597,8 @@ class ServerImpl extends TcpDiscoveryImpl { else processCustomMessage(msg); } + + msg.message(null, msg.messageBytes()); } else { addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true)); @@ -4655,6 +4657,9 @@ class ServerImpl extends TcpDiscoveryImpl { notifyDiscoveryListener(msg); } + if (msg.verified()) + msg.message(null, msg.messageBytes()); + if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a6b3dedd/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java index e10de46..9064080 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java @@ -70,14 +70,8 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage } /** - * @return Deserialized message, - * @throws java.lang.Throwable if unmarshal failed. - */ - @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh) throws Throwable { - return message(marsh, null); - } - - /** + * @param marsh Marshaller. + * @param ldr Classloader. * @return Deserialized message, * @throws java.lang.Throwable if unmarshal failed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a6b3dedd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationLeakTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationLeakTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationLeakTest.java new file mode 100644 index 0000000..85d6de5 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationLeakTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class CacheConfigurationLeakTest extends GridCommonAbstractTest { + /** + * + */ + public CacheConfigurationLeakTest() { + super(true); + } + + /** + * @throws Exception If failed. + */ + public void testCacheCreateLeak() throws Exception { + final Ignite ignite = grid(); + + GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { + @Override public void apply(Integer idx) { + for (int i = 0; i < 100; i++) { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName("cache-" + idx + "-" + i); + ccfg.setEvictionPolicy(new LruEvictionPolicy(1000)); + + IgniteCache cache = ignite.createCache(ccfg); + + for (int k = 0; k < 5000; k++) + cache.put(k, new byte[1024]); + + ignite.destroyCache(cache.getName()); + } + } + }, 5, "cache-thread"); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a6b3dedd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index f6eb430..e627083 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -572,8 +572,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr } /** @inheritDoc */ - @Override public boolean clear(GridCacheVersion ver, boolean readers, - @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { + @Override public boolean clear(GridCacheVersion ver, boolean readers) throws IgniteCheckedException { if (ver == null || ver.equals(this.ver)) { val = null; @@ -850,7 +849,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr } /** {@inheritDoc} */ - @Override public boolean offheapSwapEvict(byte[] vb, GridCacheVersion evictVer, GridCacheVersion obsoleteVer) + @Override public boolean onOffheapEvict(byte[] vb, GridCacheVersion evictVer, GridCacheVersion obsoleteVer) throws IgniteCheckedException, GridCacheEntryRemovedException { return false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/a6b3dedd/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 7bb2cf3..7635f0b 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -1884,7 +1884,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { if (msg instanceof TcpDiscoveryCustomEventMessage) { try { DiscoveryCustomMessage custMsg = GridTestUtils.getFieldValue( - ((TcpDiscoveryCustomEventMessage)msg).message(marsh), "delegate"); + ((TcpDiscoveryCustomEventMessage)msg).message(marsh, U.gridClassLoader()), "delegate"); if (custMsg instanceof StartRoutineAckDiscoveryMessage) { log.info("Skip message send and stop node: " + msg); http://git-wip-us.apache.org/repos/asf/ignite/blob/a6b3dedd/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java index cadcba7..d83b272 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java @@ -22,6 +22,7 @@ import org.apache.ignite.cache.affinity.fair.FairAffinityFunctionBackupFilterSel import org.apache.ignite.cache.affinity.fair.FairAffinityFunctionExcludeNeighborsSelfTest; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionBackupFilterSelfTest; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionExcludeNeighborsSelfTest; +import org.apache.ignite.internal.processors.cache.CacheConfigurationLeakTest; import org.apache.ignite.internal.processors.cache.CacheDhtLocalPartitionAfterRemoveSelfTest; import org.apache.ignite.internal.processors.cache.CrossCacheTxRandomOperationsTest; import org.apache.ignite.internal.processors.cache.GridCacheAtomicMessageCountSelfTest; @@ -248,6 +249,7 @@ public class IgniteCacheTestSuite2 extends TestSuite { suite.addTest(new TestSuite(IgniteDynamicCacheAndNodeStop.class)); suite.addTest(new TestSuite(CacheLockReleaseNodeLeaveTest.class)); suite.addTest(new TestSuite(NearCacheSyncUpdateTest.class)); + suite.addTest(new TestSuite(CacheConfigurationLeakTest.class)); return suite; } http://git-wip-us.apache.org/repos/asf/ignite/blob/a6b3dedd/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryOffheapEvictDataLostTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryOffheapEvictDataLostTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryOffheapEvictDataLostTest.java new file mode 100644 index 0000000..26e8300 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryOffheapEvictDataLostTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; + +/** + * + */ +public class CacheQueryOffheapEvictDataLostTest extends GridCommonAbstractTest { + /** */ + private static final int KEYS = 100_000; + + /** + * + */ + public CacheQueryOffheapEvictDataLostTest() { + super(true); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration() throws Exception { + IgniteConfiguration cfg = super.getConfiguration(); + + CacheConfiguration<Object, Object> ccfg1 = new CacheConfiguration<>(); + + ccfg1.setName("cache-1"); + ccfg1.setMemoryMode(OFFHEAP_TIERED); + ccfg1.setOffHeapMaxMemory(1024); + ccfg1.setIndexedTypes(Integer.class, TestData.class); + ccfg1.setSwapEnabled(false); + + CacheConfiguration<Object, Object> ccfg2 = new CacheConfiguration<>(); + + ccfg2.setName("cache-2"); + ccfg2.setMemoryMode(ONHEAP_TIERED); + ccfg2.setEvictionPolicy(new LruEvictionPolicy(10)); + ccfg2.setOffHeapMaxMemory(1024); + ccfg2.setIndexedTypes(Integer.class, TestData.class); + ccfg2.setSwapEnabled(false); + + cfg.setCacheConfiguration(ccfg1, ccfg2); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testQueryDataLost() throws Exception { + final long stopTime = U.currentTimeMillis() + 30_000; + + GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { + void putGet(IgniteCache<Object, Object> cache) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < KEYS; i++) { + cache.put(rnd.nextInt(KEYS), new TestData(i)); + + cache.get(rnd.nextInt(KEYS)); + } + } + + void query(IgniteCache<Object, Object> cache) { + SqlQuery<Object, Object> qry1 = new SqlQuery<>(TestData.class, "_key > ?"); + qry1.setArgs(KEYS / 2); + + cache.query(qry1).getAll(); + + SqlQuery<Object, Object> qry2 = new SqlQuery<>(TestData.class, "idxVal > ?"); + qry2.setArgs(KEYS / 2); + + cache.query(qry2).getAll(); + } + + @Override public void apply(Integer idx) { + IgniteCache<Object, Object> cache1 = grid().cache("cache-1"); + IgniteCache<Object, Object> cache2 = grid().cache("cache-2"); + + while (U.currentTimeMillis() < stopTime) { + if (idx == 0) { + putGet(cache1); + putGet(cache2); + } + else { + query(cache1); + query(cache2); + } + } + } + }, 10, "test-thread"); + } + + /** + * + */ + static class TestData implements Serializable { + /** */ + @QuerySqlField(index = true) + private int idxVal; + + /** + * @param idxVal Value. + */ + public TestData(int idxVal) { + this.idxVal = idxVal; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a6b3dedd/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRandomOperationsMultithreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRandomOperationsMultithreadedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRandomOperationsMultithreadedTest.java new file mode 100644 index 0000000..dc0175d --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRandomOperationsMultithreadedTest.java @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.ThreadLocalRandom; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.eviction.EvictionPolicy; +import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class CacheRandomOperationsMultithreadedTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int KEYS = 1000; + + /** */ + private static final int NODES = 4; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(NODES - 1); + + client = true; + + startGrid(NODES - 1); + + super.beforeTestsStarted(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapTiered() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + ATOMIC, + OFFHEAP_TIERED, + null, + false); + + randomOperations(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapTieredIndexing() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + ATOMIC, + OFFHEAP_TIERED, + null, + true); + + randomOperations(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapEviction() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + ATOMIC, + ONHEAP_TIERED, + new LruEvictionPolicy<>(10), + false); + + randomOperations(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapEvictionIndexing() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + ATOMIC, + ONHEAP_TIERED, + new LruEvictionPolicy<>(10), + true); + + randomOperations(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapTiered() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + TRANSACTIONAL, + OFFHEAP_TIERED, + null, + false); + + randomOperations(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapTieredIndexing() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + TRANSACTIONAL, + OFFHEAP_TIERED, + null, + true); + + randomOperations(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapEviction() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + TRANSACTIONAL, + ONHEAP_TIERED, + new LruEvictionPolicy<>(10), + false); + + randomOperations(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapEvictionIndexing() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + TRANSACTIONAL, + ONHEAP_TIERED, + new LruEvictionPolicy<>(10), + true); + + randomOperations(ccfg); + } + + /** + * @param ccfg CacheConfiguration. + * @throws Exception If failed. + */ + private void randomOperations(final CacheConfiguration<Object, Object> ccfg) throws Exception { + ignite(0).createCache(ccfg); + + try { + final long stopTime = U.currentTimeMillis() + 30_000; + + final boolean indexing = !F.isEmpty(ccfg.getIndexedTypes()) || + !F.isEmpty(ccfg.getQueryEntities()); + + GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { + @Override public void apply(Integer idx) { + Ignite ignite = ignite(idx % NODES); + + IgniteCache<Object, Object> cache = ignite.cache(ccfg.getName()); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (U.currentTimeMillis() < stopTime) + randomOperation(rnd, ignite, cache, indexing); + } + }, 1, "test-thread"); + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + + /** + * @param rnd Random generator. + * @param ignite Node. + * @param cache Cache. + * @param indexing Indexing flag. + */ + private void randomOperation(ThreadLocalRandom rnd, + Ignite ignite, + IgniteCache<Object, Object> cache, + boolean indexing) { + int r0 = rnd.nextInt(100); + + if (r0 == 0) + cache.clear(); + else if (r0 == 1) + cache.size(); + + switch (rnd.nextInt(14)) { + case 0: { + cache.put(key(rnd), value(rnd)); + + break; + } + + case 1: { + cache.getAndPut(key(rnd), value(rnd)); + + break; + } + + case 2: { + cache.get(key(rnd)); + + break; + } + + case 3: { + cache.remove(key(rnd)); + + break; + } + + case 4: { + cache.getAndRemove(key(rnd)); + + break; + } + + case 5: { + Map<Object, Object> map = new TreeMap<>(); + + for (int i = 0; i < 50; i++) + map.put(key(rnd), value(rnd)); + + cache.putAll(map); + + break; + } + + case 6: { + cache.getAll(keys(50, rnd)); + + break; + } + + case 7: { + cache.removeAll(keys(50, rnd)); + + break; + } + + case 8: { + cache.putIfAbsent(key(rnd), value(rnd)); + + break; + } + + case 9: { + cache.getAndPutIfAbsent(key(rnd), value(rnd)); + + break; + } + + case 10: { + cache.replace(key(rnd), value(rnd)); + + break; + } + + case 11: { + cache.getAndReplace(key(rnd), value(rnd)); + + break; + } + + case 12: { + ScanQuery<Object, Object> qry = new ScanQuery<>(); + qry.setFilter(new TestFilter()); + + List<Cache.Entry<Object, Object>> res = cache.query(qry).getAll(); + + assertTrue(res.size() >= 0); + + break; + } + + case 13: { + if (indexing) { + SqlQuery<Object, Object> qry = new SqlQuery<>(TestData.class, "where val1 > ?"); + qry.setArgs(KEYS / 2); + + List<Cache.Entry<Object, Object>> res = cache.query(qry).getAll(); + + assertTrue(res.size() >= 0); + } + + break; + } + + default: + fail(); + } + } + + /** + * @param cnt Number of keys. + * @param rnd Random generator. + * @return Keys. + */ + private Set<Object> keys(int cnt, ThreadLocalRandom rnd) { + TreeSet<Object> keys = new TreeSet<>(); + + for (int i = 0; i < cnt; i++) + keys.add(key(rnd)); + + return keys; + } + + /** + * @param rnd Random generator. + * @return Key. + */ + private Object key(ThreadLocalRandom rnd) { + return new TestKey(rnd.nextInt(KEYS), rnd); + } + + /** + * @param rnd Random generator. + * @return Value. + */ + private Object value(ThreadLocalRandom rnd) { + return new TestData(rnd); + } + + /** + * @param cacheMode Cache mode. + * @param atomicityMode Cache atomicity mode. + * @param memoryMode Cache memory mode. + * @param evictionPlc Eviction policy. + * @param indexing Indexing flag. + * @return Cache configuration. + */ + private CacheConfiguration<Object, Object> cacheConfiguration( + CacheMode cacheMode, + CacheAtomicityMode atomicityMode, + CacheMemoryMode memoryMode, + @Nullable EvictionPolicy<Object, Object> evictionPlc, + boolean indexing) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setAtomicityMode(atomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setMemoryMode(memoryMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + ccfg.setEvictionPolicy(evictionPlc); + ccfg.setOffHeapMaxMemory(0); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(1); + + if (indexing) + ccfg.setIndexedTypes(TestKey.class, TestData.class); + + return ccfg; + } + + /** + * + */ + static class TestFilter implements IgniteBiPredicate<Object, Object> { + /** {@inheritDoc} */ + @Override public boolean apply(Object key, Object val) { + return ThreadLocalRandom.current().nextInt(10) == 0; + } + } + + /** + * + */ + static class TestKey implements Serializable, Comparable<TestKey> { + /** */ + private int key; + + /** */ + private byte[] byteVal; + + /** {@inheritDoc} */ + @Override public int compareTo(TestKey o) { + return Integer.compare(key, o.key); + } + + /** + * @param key Key. + * @param rnd Random generator. + */ + public TestKey(int key, ThreadLocalRandom rnd) { + this.key = key; + byteVal = new byte[rnd.nextInt(100)]; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestKey testKey = (TestKey)o; + + return key == testKey.key; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key; + } + } + + /** + * + */ + static class TestData implements Serializable { + /** */ + @QuerySqlField(index = true) + private int val1; + + /** */ + private long val2; + + /** */ + @QuerySqlField(index = true) + private String val3; + + /** */ + private byte[] val4; + + /** + * @param rnd Random generator. + */ + public TestData(ThreadLocalRandom rnd) { + val1 = rnd.nextInt(); + val2 = val1; + val3 = String.valueOf(val1); + val4 = new byte[rnd.nextInt(1024)]; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a6b3dedd/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index cecb8ad..f11c2f8 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -21,6 +21,7 @@ import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.CacheLocalQueryMetricsSelfTest; import org.apache.ignite.internal.processors.cache.CachePartitionedQueryMetricsDistributedSelfTest; import org.apache.ignite.internal.processors.cache.CachePartitionedQueryMetricsLocalSelfTest; +import org.apache.ignite.internal.processors.cache.CacheQueryOffheapEvictDataLostTest; import org.apache.ignite.internal.processors.cache.CacheReplicatedQueryMetricsDistributedSelfTest; import org.apache.ignite.internal.processors.cache.CacheReplicatedQueryMetricsLocalSelfTest; import org.apache.ignite.internal.processors.cache.CacheScanPartitionQueryFallbackSelfTest; @@ -224,6 +225,8 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(GridOrderedMessageCancelSelfTest.class); + suite.addTestSuite(CacheQueryOffheapEvictDataLostTest.class); + // Ignite cache and H2 comparison. suite.addTestSuite(BaseH2CompareQueryTest.class); suite.addTestSuite(H2CompareBigQueryTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/a6b3dedd/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java index 550c69f..9ff7520 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java @@ -20,6 +20,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.CacheConfigurationP2PTest; import org.apache.ignite.internal.processors.cache.CacheIndexStreamerTest; +import org.apache.ignite.internal.processors.cache.CacheRandomOperationsMultithreadedTest; import org.apache.ignite.internal.processors.cache.GridCacheOffHeapAndSwapSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheOffHeapSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheOffheapIndexEntryEvictTest; @@ -70,6 +71,7 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheConfigurationPrimitiveTypesSelfTest.class); suite.addTestSuite(IgniteClientReconnectQueriesTest.class); + suite.addTestSuite(CacheRandomOperationsMultithreadedTest.class); return suite; }
