ignite-2615 If swap is not enabled need pass value evicted from offheap to query manager (cherry picked from commit a6b3dedd381a71fec8dd9dcf944be2626336cc22)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/accb0e9e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/accb0e9e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/accb0e9e Branch: refs/heads/ignite-testing-discovery Commit: accb0e9efa9b630801912784b29eca64622152ef Parents: d6a143c Author: sboikov <[email protected]> Authored: Mon Apr 18 12:09:53 2016 +0300 Committer: sboikov <[email protected]> Committed: Mon Apr 18 12:10:46 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheEntryEx.java | 7 +- .../processors/cache/GridCacheMapEntry.java | 12 +- .../processors/cache/GridCacheProcessor.java | 4 +- .../processors/cache/GridCacheSwapManager.java | 43 +- .../processors/cache/GridCacheTestEntryEx.java | 2 +- .../CacheQueryOffheapEvictDataLostTest.java | 138 +++++ .../CacheRandomOperationsMultithreadedTest.java | 507 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite2.java | 3 + .../IgniteCacheWithIndexingTestSuite.java | 2 + 9 files changed, 695 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/accb0e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index a7880e3..dc8e08c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -931,7 +931,10 @@ public interface GridCacheEntryEx { public void updateTtl(@Nullable GridCacheVersion ver, long ttl) throws GridCacheEntryRemovedException; /** - * Tries to do offheap -> swap eviction. + * Called when entry should be evicted from offheap. + * <p> + * If swap is enabled tries to do offheap -> swap eviction, otherwise evicted value should + * be passed to query manager. * * @param entry Serialized swap entry. * @param evictVer Version when entry was selected for eviction. @@ -940,7 +943,7 @@ public interface GridCacheEntryEx { * @throws GridCacheEntryRemovedException If entry was removed. * @return {@code True} if entry was obsoleted and written to swap. */ - public boolean offheapSwapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer) + public boolean onOffheapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer) throws IgniteCheckedException, GridCacheEntryRemovedException; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/accb0e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index f31a992..435d337 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -431,9 +431,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** {@inheritDoc} */ - @Override public boolean offheapSwapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer) + @Override public boolean onOffheapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer) throws IgniteCheckedException, GridCacheEntryRemovedException { - assert cctx.swap().swapEnabled() && cctx.swap().offHeapEnabled() : this; + assert cctx.swap().offHeapEnabled() && (cctx.swap().swapEnabled() || cctx.queries().enabled()) : this; boolean obsolete; @@ -448,12 +448,18 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (mvcc != null && !mvcc.isEmpty(obsoleteVer)) return false; - if (cctx.swap().offheapSwapEvict(key, entry, partition(), evictVer)) { + if (cctx.swap().onOffheapEvict(key, entry, partition(), evictVer)) { assert !hasValueUnlocked() : this; obsolete = markObsolete0(obsoleteVer, false, null); assert obsolete : this; + + if (!cctx.swap().swapEnabled()) { + CacheObject val = cctx.swap().unmarshalSwapEntryValue(entry); + + clearIndex(val); + } } else obsolete = false; http://git-wip-us.apache.org/repos/asf/ignite/blob/accb0e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index a8f3a4a..7c456b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -2846,9 +2846,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { try { KeyCacheObject key = cctx.toCacheKeyObject(keyBytes); - GridCacheSwapEntry swapEntry = GridCacheSwapEntryImpl.unmarshal(valBytes, true); - - CacheObject val = swapEntry.value(); + CacheObject val = cctx.swap().unmarshalSwapEntryValue(valBytes); assert val != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/accb0e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index a35bb3f..d50bf0b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionAware; import org.apache.ignite.internal.processors.offheap.GridOffHeapProcessor; +import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridEmptyCloseableIterator; @@ -106,8 +107,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** Soft iterator set. */ private final Collection<GridWeakIterator<Map.Entry>> itSet = new GridConcurrentHashSet<>(); - /** {@code True} if offheap to swap eviction is possible. */ - private boolean offheapToSwapEvicts; + /** {@code True} if need process evictions from offheap. */ + private boolean unwindOffheapEvicts; /** Values to be evicted from offheap to swap. */ private ThreadLocal<Collection<IgniteBiTuple<byte[], byte[]>>> offheapEvicts = new ThreadLocal<>(); @@ -141,7 +142,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * */ public void unwindOffheapEvicts() { - if (!offheapToSwapEvicts) + if (!unwindOffheapEvicts) return; Collection<IgniteBiTuple<byte[], byte[]>> evicts = offheapEvicts.get(); @@ -162,7 +163,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { GridCacheEntryEx entry = cctx.cache().entryEx(key); try { - if (entry.offheapSwapEvict(vb, evictVer, obsoleteVer)) + if (entry.onOffheapEvict(vb, evictVer, obsoleteVer)) cctx.cache().removeEntry(entry); break; @@ -199,12 +200,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { GridOffHeapEvictListener lsnr; - if (swapEnabled) { - offheapToSwapEvicts = true; + if (swapEnabled || GridQueryProcessor.isEnabled(cctx.config())) { + unwindOffheapEvicts = true; lsnr = new GridOffHeapEvictListener() { @Override public void onEvict(int part, int hash, byte[] kb, byte[] vb) { - assert offheapToSwapEvicts; + assert unwindOffheapEvicts; onOffheapEvict(); @@ -1100,7 +1101,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * @return {@code True} if removed. * @throws IgniteCheckedException If failed. */ - boolean offheapSwapEvict(final KeyCacheObject key, byte[] entry, int part, final GridCacheVersion ver) + boolean onOffheapEvict(final KeyCacheObject key, byte[] entry, int part, final GridCacheVersion ver) throws IgniteCheckedException { assert offheapEnabled; @@ -1120,13 +1121,14 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { Collection<GridCacheSwapListener> lsnrs = offheapLsnrs.get(part); if (lsnrs != null) { - GridCacheSwapEntry e = swapEntry(GridCacheSwapEntryImpl.unmarshal(entry, false)); + GridCacheSwapEntry e = swapEntry(unmarshalSwapEntry(entry, false)); for (GridCacheSwapListener lsnr : lsnrs) lsnr.onEntryUnswapped(part, key, e); } - cctx.swap().writeToSwap(part, key, entry); + if (swapEnabled) + cctx.swap().writeToSwap(part, key, entry); } return rmv; @@ -2166,6 +2168,19 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** * @param bytes Bytes to unmarshal. + * @return Unmarshalled values. + * @throws IgniteCheckedException If failed. + */ + public CacheObject unmarshalSwapEntryValue(byte[] bytes) throws IgniteCheckedException { + GridCacheSwapEntry swapEntry = swapEntry(GridCacheSwapEntryImpl.unmarshal(bytes, true)); + + assert swapEntry != null && swapEntry.value() != null : swapEntry; + + return swapEntry.value(); + } + + /** + * @param bytes Bytes to unmarshal. * @param valOnly If {@code true} unmarshalls only value. * @return Unmarshalled entry. */ @@ -2216,9 +2231,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { @Override protected Map.Entry<byte[], GridCacheSwapEntry> onNext() throws IgniteCheckedException { Map.Entry<byte[], byte[]> e = iter.nextX(); - GridCacheSwapEntry unmarshalled = unmarshalSwapEntry(e.getValue(), false); + GridCacheSwapEntry unmarshalled = swapEntry(unmarshalSwapEntry(e.getValue(), false)); - return F.t(e.getKey(), swapEntry(unmarshalled)); + return F.t(e.getKey(), unmarshalled); } /** {@inheritDoc} */ @@ -2524,9 +2539,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** {@inheritDoc} */ @Override public V getValue() { try { - GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue(), false); + GridCacheSwapEntry e = swapEntry(unmarshalSwapEntry(entry.getValue(), false)); - swapEntry(e); + assert e != null; return e.value().value(cctx.cacheObjectContext(), false); } http://git-wip-us.apache.org/repos/asf/ignite/blob/accb0e9e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 1bede92..4455b46 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -853,7 +853,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr } /** {@inheritDoc} */ - @Override public boolean offheapSwapEvict(byte[] vb, GridCacheVersion evictVer, GridCacheVersion obsoleteVer) + @Override public boolean onOffheapEvict(byte[] vb, GridCacheVersion evictVer, GridCacheVersion obsoleteVer) throws IgniteCheckedException, GridCacheEntryRemovedException { return false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/accb0e9e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryOffheapEvictDataLostTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryOffheapEvictDataLostTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryOffheapEvictDataLostTest.java new file mode 100644 index 0000000..26e8300 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryOffheapEvictDataLostTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; + +/** + * + */ +public class CacheQueryOffheapEvictDataLostTest extends GridCommonAbstractTest { + /** */ + private static final int KEYS = 100_000; + + /** + * + */ + public CacheQueryOffheapEvictDataLostTest() { + super(true); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration() throws Exception { + IgniteConfiguration cfg = super.getConfiguration(); + + CacheConfiguration<Object, Object> ccfg1 = new CacheConfiguration<>(); + + ccfg1.setName("cache-1"); + ccfg1.setMemoryMode(OFFHEAP_TIERED); + ccfg1.setOffHeapMaxMemory(1024); + ccfg1.setIndexedTypes(Integer.class, TestData.class); + ccfg1.setSwapEnabled(false); + + CacheConfiguration<Object, Object> ccfg2 = new CacheConfiguration<>(); + + ccfg2.setName("cache-2"); + ccfg2.setMemoryMode(ONHEAP_TIERED); + ccfg2.setEvictionPolicy(new LruEvictionPolicy(10)); + ccfg2.setOffHeapMaxMemory(1024); + ccfg2.setIndexedTypes(Integer.class, TestData.class); + ccfg2.setSwapEnabled(false); + + cfg.setCacheConfiguration(ccfg1, ccfg2); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testQueryDataLost() throws Exception { + final long stopTime = U.currentTimeMillis() + 30_000; + + GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { + void putGet(IgniteCache<Object, Object> cache) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < KEYS; i++) { + cache.put(rnd.nextInt(KEYS), new TestData(i)); + + cache.get(rnd.nextInt(KEYS)); + } + } + + void query(IgniteCache<Object, Object> cache) { + SqlQuery<Object, Object> qry1 = new SqlQuery<>(TestData.class, "_key > ?"); + qry1.setArgs(KEYS / 2); + + cache.query(qry1).getAll(); + + SqlQuery<Object, Object> qry2 = new SqlQuery<>(TestData.class, "idxVal > ?"); + qry2.setArgs(KEYS / 2); + + cache.query(qry2).getAll(); + } + + @Override public void apply(Integer idx) { + IgniteCache<Object, Object> cache1 = grid().cache("cache-1"); + IgniteCache<Object, Object> cache2 = grid().cache("cache-2"); + + while (U.currentTimeMillis() < stopTime) { + if (idx == 0) { + putGet(cache1); + putGet(cache2); + } + else { + query(cache1); + query(cache2); + } + } + } + }, 10, "test-thread"); + } + + /** + * + */ + static class TestData implements Serializable { + /** */ + @QuerySqlField(index = true) + private int idxVal; + + /** + * @param idxVal Value. + */ + public TestData(int idxVal) { + this.idxVal = idxVal; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/accb0e9e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRandomOperationsMultithreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRandomOperationsMultithreadedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRandomOperationsMultithreadedTest.java new file mode 100644 index 0000000..dc0175d --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRandomOperationsMultithreadedTest.java @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.ThreadLocalRandom; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.eviction.EvictionPolicy; +import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class CacheRandomOperationsMultithreadedTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int KEYS = 1000; + + /** */ + private static final int NODES = 4; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(NODES - 1); + + client = true; + + startGrid(NODES - 1); + + super.beforeTestsStarted(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapTiered() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + ATOMIC, + OFFHEAP_TIERED, + null, + false); + + randomOperations(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapTieredIndexing() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + ATOMIC, + OFFHEAP_TIERED, + null, + true); + + randomOperations(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapEviction() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + ATOMIC, + ONHEAP_TIERED, + new LruEvictionPolicy<>(10), + false); + + randomOperations(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapEvictionIndexing() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + ATOMIC, + ONHEAP_TIERED, + new LruEvictionPolicy<>(10), + true); + + randomOperations(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapTiered() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + TRANSACTIONAL, + OFFHEAP_TIERED, + null, + false); + + randomOperations(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapTieredIndexing() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + TRANSACTIONAL, + OFFHEAP_TIERED, + null, + true); + + randomOperations(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapEviction() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + TRANSACTIONAL, + ONHEAP_TIERED, + new LruEvictionPolicy<>(10), + false); + + randomOperations(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapEvictionIndexing() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + TRANSACTIONAL, + ONHEAP_TIERED, + new LruEvictionPolicy<>(10), + true); + + randomOperations(ccfg); + } + + /** + * @param ccfg CacheConfiguration. + * @throws Exception If failed. + */ + private void randomOperations(final CacheConfiguration<Object, Object> ccfg) throws Exception { + ignite(0).createCache(ccfg); + + try { + final long stopTime = U.currentTimeMillis() + 30_000; + + final boolean indexing = !F.isEmpty(ccfg.getIndexedTypes()) || + !F.isEmpty(ccfg.getQueryEntities()); + + GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { + @Override public void apply(Integer idx) { + Ignite ignite = ignite(idx % NODES); + + IgniteCache<Object, Object> cache = ignite.cache(ccfg.getName()); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (U.currentTimeMillis() < stopTime) + randomOperation(rnd, ignite, cache, indexing); + } + }, 1, "test-thread"); + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + + /** + * @param rnd Random generator. + * @param ignite Node. + * @param cache Cache. + * @param indexing Indexing flag. + */ + private void randomOperation(ThreadLocalRandom rnd, + Ignite ignite, + IgniteCache<Object, Object> cache, + boolean indexing) { + int r0 = rnd.nextInt(100); + + if (r0 == 0) + cache.clear(); + else if (r0 == 1) + cache.size(); + + switch (rnd.nextInt(14)) { + case 0: { + cache.put(key(rnd), value(rnd)); + + break; + } + + case 1: { + cache.getAndPut(key(rnd), value(rnd)); + + break; + } + + case 2: { + cache.get(key(rnd)); + + break; + } + + case 3: { + cache.remove(key(rnd)); + + break; + } + + case 4: { + cache.getAndRemove(key(rnd)); + + break; + } + + case 5: { + Map<Object, Object> map = new TreeMap<>(); + + for (int i = 0; i < 50; i++) + map.put(key(rnd), value(rnd)); + + cache.putAll(map); + + break; + } + + case 6: { + cache.getAll(keys(50, rnd)); + + break; + } + + case 7: { + cache.removeAll(keys(50, rnd)); + + break; + } + + case 8: { + cache.putIfAbsent(key(rnd), value(rnd)); + + break; + } + + case 9: { + cache.getAndPutIfAbsent(key(rnd), value(rnd)); + + break; + } + + case 10: { + cache.replace(key(rnd), value(rnd)); + + break; + } + + case 11: { + cache.getAndReplace(key(rnd), value(rnd)); + + break; + } + + case 12: { + ScanQuery<Object, Object> qry = new ScanQuery<>(); + qry.setFilter(new TestFilter()); + + List<Cache.Entry<Object, Object>> res = cache.query(qry).getAll(); + + assertTrue(res.size() >= 0); + + break; + } + + case 13: { + if (indexing) { + SqlQuery<Object, Object> qry = new SqlQuery<>(TestData.class, "where val1 > ?"); + qry.setArgs(KEYS / 2); + + List<Cache.Entry<Object, Object>> res = cache.query(qry).getAll(); + + assertTrue(res.size() >= 0); + } + + break; + } + + default: + fail(); + } + } + + /** + * @param cnt Number of keys. + * @param rnd Random generator. + * @return Keys. + */ + private Set<Object> keys(int cnt, ThreadLocalRandom rnd) { + TreeSet<Object> keys = new TreeSet<>(); + + for (int i = 0; i < cnt; i++) + keys.add(key(rnd)); + + return keys; + } + + /** + * @param rnd Random generator. + * @return Key. + */ + private Object key(ThreadLocalRandom rnd) { + return new TestKey(rnd.nextInt(KEYS), rnd); + } + + /** + * @param rnd Random generator. + * @return Value. + */ + private Object value(ThreadLocalRandom rnd) { + return new TestData(rnd); + } + + /** + * @param cacheMode Cache mode. + * @param atomicityMode Cache atomicity mode. + * @param memoryMode Cache memory mode. + * @param evictionPlc Eviction policy. + * @param indexing Indexing flag. + * @return Cache configuration. + */ + private CacheConfiguration<Object, Object> cacheConfiguration( + CacheMode cacheMode, + CacheAtomicityMode atomicityMode, + CacheMemoryMode memoryMode, + @Nullable EvictionPolicy<Object, Object> evictionPlc, + boolean indexing) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setAtomicityMode(atomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setMemoryMode(memoryMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + ccfg.setEvictionPolicy(evictionPlc); + ccfg.setOffHeapMaxMemory(0); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(1); + + if (indexing) + ccfg.setIndexedTypes(TestKey.class, TestData.class); + + return ccfg; + } + + /** + * + */ + static class TestFilter implements IgniteBiPredicate<Object, Object> { + /** {@inheritDoc} */ + @Override public boolean apply(Object key, Object val) { + return ThreadLocalRandom.current().nextInt(10) == 0; + } + } + + /** + * + */ + static class TestKey implements Serializable, Comparable<TestKey> { + /** */ + private int key; + + /** */ + private byte[] byteVal; + + /** {@inheritDoc} */ + @Override public int compareTo(TestKey o) { + return Integer.compare(key, o.key); + } + + /** + * @param key Key. + * @param rnd Random generator. + */ + public TestKey(int key, ThreadLocalRandom rnd) { + this.key = key; + byteVal = new byte[rnd.nextInt(100)]; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestKey testKey = (TestKey)o; + + return key == testKey.key; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key; + } + } + + /** + * + */ + static class TestData implements Serializable { + /** */ + @QuerySqlField(index = true) + private int val1; + + /** */ + private long val2; + + /** */ + @QuerySqlField(index = true) + private String val3; + + /** */ + private byte[] val4; + + /** + * @param rnd Random generator. + */ + public TestData(ThreadLocalRandom rnd) { + val1 = rnd.nextInt(); + val2 = val1; + val3 = String.valueOf(val1); + val4 = new byte[rnd.nextInt(1024)]; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/accb0e9e/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java index 64a182b..4b83f69 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java @@ -21,6 +21,7 @@ import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.CacheLocalQueryMetricsSelfTest; import org.apache.ignite.internal.processors.cache.CachePartitionedQueryMetricsDistributedSelfTest; import org.apache.ignite.internal.processors.cache.CachePartitionedQueryMetricsLocalSelfTest; +import org.apache.ignite.internal.processors.cache.CacheQueryOffheapEvictDataLostTest; import org.apache.ignite.internal.processors.cache.CacheReplicatedQueryMetricsDistributedSelfTest; import org.apache.ignite.internal.processors.cache.CacheReplicatedQueryMetricsLocalSelfTest; import org.apache.ignite.internal.processors.cache.CacheScanPartitionQueryFallbackSelfTest; @@ -82,6 +83,8 @@ public class IgniteCacheQuerySelfTestSuite2 extends TestSuite { suite.addTestSuite(GridOrderedMessageCancelSelfTest.class); + suite.addTestSuite(CacheQueryOffheapEvictDataLostTest.class); + // Ignite cache and H2 comparison. suite.addTestSuite(BaseH2CompareQueryTest.class); suite.addTestSuite(H2CompareBigQueryTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/accb0e9e/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java index 550c69f..9ff7520 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java @@ -20,6 +20,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.CacheConfigurationP2PTest; import org.apache.ignite.internal.processors.cache.CacheIndexStreamerTest; +import org.apache.ignite.internal.processors.cache.CacheRandomOperationsMultithreadedTest; import org.apache.ignite.internal.processors.cache.GridCacheOffHeapAndSwapSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheOffHeapSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheOffheapIndexEntryEvictTest; @@ -70,6 +71,7 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheConfigurationPrimitiveTypesSelfTest.class); suite.addTestSuite(IgniteClientReconnectQueriesTest.class); + suite.addTestSuite(CacheRandomOperationsMultithreadedTest.class); return suite; }
