Repository: ignite Updated Branches: refs/heads/ignite-3477-master 26ff68c17 -> 226d6981c
ignite-4712 Memory leaks in PageMemory Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3cba3206 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3cba3206 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3cba3206 Branch: refs/heads/ignite-3477-master Commit: 3cba32066472da5b25bcc0738973607cabd1dc45 Parents: 13c3cfc Author: sboikov <[email protected]> Authored: Thu Mar 16 10:37:02 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Mar 16 10:37:02 2017 +0300 ---------------------------------------------------------------------- .../cache/database/freelist/PagesList.java | 221 +++++++---- .../ignite/cache/LargeEntryUpdateTest.java | 177 +++++++++ .../database/IgniteDbAbstractTest.java | 369 +++++++++++++++++++ .../IgniteDbMemoryLeakAbstractTest.java | 265 +++++++++++++ .../IgniteDbMemoryLeakLargeObjectsTest.java | 56 +++ .../IgniteDbMemoryLeakLargePagesTest.java | 33 ++ .../IgniteDbMemoryLeakNonTransactionalTest.java | 31 ++ .../database/IgniteDbMemoryLeakTest.java | 46 +++ .../IgniteDbMemoryLeakWithExpirationTest.java | 44 +++ .../database/IgniteDbPutGetAbstractTest.java | 331 +---------------- .../testsuites/IgniteDbMemoryLeakTestSuite.java | 49 +++ .../database/IgniteDbMemoryLeakIndexedTest.java | 33 ++ .../IgniteDbMemoryLeakSqlQueryTest.java | 76 ++++ ...IgniteDbMemoryLeakWithIndexingTestSuite.java | 40 ++ 14 files changed, 1369 insertions(+), 402 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3cba3206/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java index e5430cf..5c66b10 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.internal.pagemem.Page; @@ -51,7 +52,6 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; -import org.jsr166.LongAdder8; import static java.lang.Boolean.FALSE; import static java.lang.Boolean.TRUE; @@ -76,10 +76,7 @@ public abstract class PagesList extends DataStructure { Math.min(8, Runtime.getRuntime().availableProcessors() * 2)); /** */ - private final boolean trackBucketsSize = IgniteSystemProperties.getBoolean("IGNITE_PAGES_LIST_TRACK_SIZE", false); - - /** */ - protected final LongAdder8[] bucketsSize; + protected final AtomicLong[] bucketsSize; /** Page ID to store list metadata. */ private final long metaPageId; @@ -117,7 +114,7 @@ public abstract class PagesList extends DataStructure { return TRUE; } - }; + } /** * @param cacheId Cache ID. @@ -142,10 +139,10 @@ public abstract class PagesList extends DataStructure { this.buckets = buckets; this.metaPageId = metaPageId; - bucketsSize = new LongAdder8[buckets]; + bucketsSize = new AtomicLong[buckets]; for (int i = 0; i < buckets; i++) - bucketsSize[i] = new LongAdder8(); + bucketsSize[i] = new AtomicLong(); } /** @@ -191,6 +188,7 @@ public abstract class PagesList extends DataStructure { for (Map.Entry<Integer, GridLongList> e : bucketsData.entrySet()) { int bucket = e.getKey(); + long bucketSize = 0; Stripe[] old = getBucket(bucket); assert old == null; @@ -199,11 +197,43 @@ public abstract class PagesList extends DataStructure { Stripe[] tails = new Stripe[upd.length]; - for (int i = 0; i < upd.length; i++) - tails[i] = new Stripe(upd[i]); + for (int i = 0; i < upd.length; i++) { + long tailId = upd[i]; + + long pageId = tailId; + int cnt = 0; + + while (pageId != 0L) { + try (Page page = page(pageId)) { + long pageAddr = readLock(page); + + assert pageAddr != 0L; + + try { + PagesListNodeIO io = PagesListNodeIO.VERSIONS.forPage(pageAddr); + + cnt += io.getCount(pageAddr); + pageId = io.getPreviousId(pageAddr); + + // In reuse bucket the page itself can be used as a free page. + if (isReuseBucket(bucket) && pageId != 0L) + cnt++; + } + finally { + readUnlock(page, pageAddr); + } + } + } + + Stripe stripe = new Stripe(tailId, cnt == 0); + tails[i] = stripe; + bucketSize += cnt; + } boolean ok = casBucket(bucket, null, tails); assert ok; + + bucketsSize[bucket].set(bucketSize); } } } @@ -363,7 +393,7 @@ public abstract class PagesList extends DataStructure { initPage(pageMem, page, this, PagesListNodeIO.VERSIONS.latest(), wal); } - Stripe stripe = new Stripe(pageId); + Stripe stripe = new Stripe(pageId, true); for (;;) { Stripe[] old = getBucket(bucket); @@ -490,25 +520,32 @@ public abstract class PagesList extends DataStructure { for (Stripe tail : tails) { long pageId = tail.tailId; - try (Page page = page(pageId)) { - long pageAddr = readLock(page); // No correctness guaranties. + while (pageId != 0L) { + try (Page page = page(pageId)) { + long pageAddr = readLock(page); - try { - PagesListNodeIO io = PagesListNodeIO.VERSIONS.forPage(pageAddr); + assert pageAddr != 0L; - int cnt = io.getCount(pageAddr); + try { + PagesListNodeIO io = PagesListNodeIO.VERSIONS.forPage(pageAddr); - assert cnt >= 0; + res += io.getCount(pageAddr); + pageId = io.getPreviousId(pageAddr); - res += cnt; - } - finally { - readUnlock(page, pageAddr); + // In reuse bucket the page itself can be used as a free page. + if (isReuseBucket(bucket) && pageId != 0L) + res++; + } + finally { + readUnlock(page, pageAddr); + } } } } } + assert res == bucketsSize[bucket].get() : "Wrong bucket size counter [exp=" + res + ", cntr=" + bucketsSize[bucket].get() + ']'; + return res; } @@ -600,8 +637,7 @@ public abstract class PagesList extends DataStructure { if (idx == -1) handlePageFull(pageId, page, pageAddr, io, dataPage, dataPageAddr, bucket); else { - if (trackBucketsSize) - bucketsSize[bucket].increment(); + bucketsSize[bucket].incrementAndGet(); if (isWalDeltaRecordNeeded(wal, page)) wal.log(new PagesListAddPageRecord(cacheId, pageId, dataPageId)); @@ -660,6 +696,9 @@ public abstract class PagesList extends DataStructure { dataPageId, pageId, 0L)); + // In reuse bucket the page itself can be used as a free page. + bucketsSize[bucket].incrementAndGet(); + updateTail(bucket, pageId, dataPageId); } else { @@ -695,14 +734,13 @@ public abstract class PagesList extends DataStructure { assert idx != -1; - if (trackBucketsSize) - bucketsSize[bucket].increment(); - dataIO.setFreeListPageId(dataPageAddr, nextId); if (isWalDeltaRecordNeeded(wal, dataPage)) wal.log(new DataPageSetFreeListPageRecord(cacheId, dataPageId, nextId)); + bucketsSize[bucket].incrementAndGet(); + updateTail(bucket, pageId, nextId); } finally { @@ -778,6 +816,10 @@ public abstract class PagesList extends DataStructure { 0L )); + // In reuse bucket the page itself can be used as a free page. + if (isReuseBucket(bucket)) + bucketsSize[bucket].incrementAndGet(); + // Switch to this new page, which is now a part of our list // to add the rest of the bag to the new page. prevPageAddr = nextPageAddr; @@ -786,12 +828,11 @@ public abstract class PagesList extends DataStructure { } } else { - if (trackBucketsSize) - bucketsSize[bucket].increment(); - // TODO: use single WAL record for bag? if (isWalDeltaRecordNeeded(wal, page)) wal.log(new PagesListAddPageRecord(cacheId, prevId, nextId)); + + bucketsSize[bucket].incrementAndGet(); } } } @@ -816,10 +857,22 @@ public abstract class PagesList extends DataStructure { private Stripe getPageForTake(int bucket) { Stripe[] tails = getBucket(bucket); - if (tails == null) + if (tails == null || bucketsSize[bucket].get() == 0) return null; - return randomTail(tails); + int len = tails.length; + int init = randomInt(len); + int cur = init; + + while (true) { + Stripe stripe = tails[cur]; + + if (!stripe.empty) + return stripe; + + if ((cur = (cur + 1) % len) == init) + return null; + } } /** @@ -873,7 +926,7 @@ public abstract class PagesList extends DataStructure { for (int lockAttempt = 0; ;) { Stripe stripe = getPageForTake(bucket); - if (stripe == null || stripe.empty) + if (stripe == null) return 0L; long tailId = stripe.tailId; @@ -888,24 +941,38 @@ public abstract class PagesList extends DataStructure { continue; } + if (stripe.empty) { + // Another thread took the last page. + writeUnlock(tail, tailPageAddr, false); + + if (bucketsSize[bucket].get() > 0) { + lockAttempt--; // Ignore current attempt. + + continue; + } + else + return 0L; + } + assert PageIO.getPageId(tailPageAddr) == tailId : "tailId = " + tailId + ", tailPageId = " + PageIO.getPageId(tailPageAddr); assert PageIO.getType(tailPageAddr) == PageIO.T_PAGE_LIST_NODE; boolean dirty = false; - long ret = 0L; + long ret; long recycleId = 0L; try { PagesListNodeIO io = PagesListNodeIO.VERSIONS.forPage(tailPageAddr); - if (io.getNextId(tailPageAddr) != 0) + if (io.getNextId(tailPageAddr) != 0) { + // It is not a tail anymore, retry. continue; + } long pageId = io.takeAnyPage(tailPageAddr); if (pageId != 0L) { - if (trackBucketsSize) - bucketsSize[bucket].decrement(); + bucketsSize[bucket].decrementAndGet(); if (isWalDeltaRecordNeeded(wal, tail)) wal.log(new PagesListRemovePageRecord(cacheId, tailId, pageId)); @@ -914,59 +981,66 @@ public abstract class PagesList extends DataStructure { ret = pageId; - // If we got an empty page in non-reuse bucket, move it back to reuse list - // to prevent empty page leak to data pages. - if (io.isEmpty(tailPageAddr) && !isReuseBucket(bucket)) { + if (io.isEmpty(tailPageAddr)) { long prevId = io.getPreviousId(tailPageAddr); - if (prevId != 0L) { - try (Page prev = page(prevId)) { - // Lock pages from next to previous. - Boolean ok = writePage(pageMem, prev, this, cutTail, null, bucket, FALSE); + // If we got an empty page in non-reuse bucket, move it back to reuse list + // to prevent empty page leak to data pages. + if (!isReuseBucket(bucket)) { + if (prevId != 0L) { + try (Page prev = page(prevId)) { + // Lock pages from next to previous. + Boolean ok = writePage(pageMem, prev, this, cutTail, null, bucket, FALSE); - assert ok == TRUE : ok; - } + assert ok == TRUE : ok; + } - recycleId = recyclePage(tailId, tail, tailPageAddr); + recycleId = recyclePage(tailId, tail, tailPageAddr); + } + else + stripe.empty = true; } + else + stripe.empty = prevId == 0L; } } else { - // The tail page is empty. We can unlink and return it if we have a previous page. - long prevId = io.getPreviousId(tailPageAddr); + // The tail page is empty, but stripe is not. It might + // happen only if we are in reuse bucket and it has + // a previous page, so, the current page can be collected + assert isReuseBucket(bucket); - if (prevId != 0L) { - // This can only happen if we are in the reuse bucket. - assert isReuseBucket(bucket); + long prevId = io.getPreviousId(tailPageAddr); - try (Page prev = page(prevId)) { - // Lock pages from next to previous. - Boolean ok = writePage(pageMem, prev, this, cutTail, null, bucket, FALSE); + assert prevId != 0L; - assert ok == TRUE : ok; - } + try (Page prev = page(prevId)) { + // Lock pages from next to previous. + Boolean ok = writePage(pageMem, prev, this, cutTail, null, bucket, FALSE); - if (initIoVers != null) { - tailId = PageIdUtils.changeType(tailId, FLAG_DATA); + assert ok == TRUE : ok; - PageIO initIo = initIoVers.latest(); + bucketsSize[bucket].decrementAndGet(); + } - initIo.initNewPage(tailPageAddr, tailId, pageSize()); + if (initIoVers != null) { + tailId = PageIdUtils.changeType(tailId, FLAG_DATA); - if (isWalDeltaRecordNeeded(wal, tail)) { - wal.log(new InitNewPageRecord(cacheId, tail.id(), initIo.getType(), - initIo.getVersion(), tailId)); - } - } - else - tailId = recyclePage(tailId, tail, tailPageAddr); + PageIO initIo = initIoVers.latest(); - dirty = true; + initIo.initNewPage(tailPageAddr, tailId, pageSize()); - ret = tailId; + if (isWalDeltaRecordNeeded(wal, tail)) { + wal.log(new InitNewPageRecord(cacheId, tail.id(), initIo.getType(), + initIo.getVersion(), tailId)); + } } else - stripe.empty = true; + tailId = recyclePage(tailId, tail, tailPageAddr); + + dirty = true; + + ret = tailId; } // If we do not have a previous page (we are at head), then we still can return @@ -1026,8 +1100,7 @@ public abstract class PagesList extends DataStructure { if (!rmvd) return false; - if (trackBucketsSize) - bucketsSize[bucket].decrement(); + bucketsSize[bucket].decrementAndGet(); if (isWalDeltaRecordNeeded(wal, page)) wal.log(new PagesListRemovePageRecord(cacheId, pageId, dataPageId)); @@ -1312,9 +1385,11 @@ public abstract class PagesList extends DataStructure { /** * @param tailId Tail ID. + * @param empty Empty flag. */ - Stripe(long tailId) { + Stripe(long tailId, boolean empty) { this.tailId = tailId; + this.empty = empty; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3cba3206/modules/core/src/test/java/org/apache/ignite/cache/LargeEntryUpdateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/LargeEntryUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/cache/LargeEntryUpdateTest.java new file mode 100644 index 0000000..18a1654 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/LargeEntryUpdateTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.cache; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.IgniteException; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class LargeEntryUpdateTest extends GridCommonAbstractTest { + /** */ + private static final int THREAD_COUNT = 10; + + /** */ + private static final int PAGE_SIZE = 1 << 10; // 1 kB. + + /** */ + private static final int PAGE_CACHE_SIZE = 30 << 20; // 30 MB. + + /** */ + private static final String CACHE_PREFIX = "testCache"; + + /** */ + private static final int CACHE_COUNT = 10; + + /** */ + private static final long WAIT_TIMEOUT = 5 * 60_000L; // 5 min. + + /** */ + private static final long TEST_TIMEOUT = 10 * 60_000L; // 10 min. + + /** */ + private final AtomicBoolean cacheUpdate = new AtomicBoolean(); + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIMEOUT; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPublicThreadPoolSize(THREAD_COUNT); + + MemoryConfiguration mem = new MemoryConfiguration(); + + mem.setPageSize(PAGE_SIZE); + mem.setPageCacheSize(PAGE_CACHE_SIZE); + + cfg.setMemoryConfiguration(mem); + + CacheConfiguration[] ccfgs = new CacheConfiguration[CACHE_COUNT]; + + for (int i = 0; i < CACHE_COUNT; ++i) { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(CACHE_PREFIX + i); + ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC); + ccfg.setCacheMode(CacheMode.PARTITIONED); + ccfgs[i] = ccfg; + } + + cfg.setCacheConfiguration(ccfgs); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testEntryUpdate() throws Exception { + try (Ignite ignite = startGrid()) { + for (int i = 0; i < CACHE_COUNT; ++i) { + IgniteCache<Long, byte[]> cache = ignite.cache(CACHE_PREFIX + i); + + cache.put(0L, new byte[PAGE_SIZE * 2]); + } + + IgniteCompute compute = ignite.compute().withAsync(); + + long endTime = System.currentTimeMillis() + WAIT_TIMEOUT; + + int iter = 0; + + while (System.currentTimeMillis() < endTime) { + log.info("Iteration: " + iter++); + + cacheUpdate.set(true); + + try { + List<IgniteFuture> futs = new ArrayList<>(); + + for (int i = 0; i < THREAD_COUNT; ++i) { + compute.run(new CacheUpdater()); + + futs.add(compute.future()); + } + + Thread.sleep(30_000); + + cacheUpdate.set(false); + + for (IgniteFuture fut : futs) + fut.get(); + } + finally { + cacheUpdate.set(false); + } + } + } + } + + /** */ + public static class EntryUpdater implements CacheEntryProcessor<Long, byte[], Void> { + /** */ + public static final EntryUpdater INSTANCE = new EntryUpdater(); + + /** {@inheritDoc} */ + @Override public Void process(MutableEntry<Long, byte[]> entry, Object... args) { + entry.setValue(new byte[PAGE_SIZE]); + + return null; + } + } + + /** */ + public class CacheUpdater implements IgniteRunnable { + /** */ + @IgniteInstanceResource + public transient Ignite ignite; + + /** {@inheritDoc} */ + @Override public void run() { + try { + while (cacheUpdate.get()) { + for (int i = 0; i < CACHE_COUNT; ++i) { + IgniteCache<Long, byte[]> cache = ignite.cache(CACHE_PREFIX + i); + + cache.invoke(0L, EntryUpdater.INSTANCE); + } + } + } + catch (Throwable ex) { + throw new IgniteException(ex); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3cba3206/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java new file mode 100644 index 0000000..cf26187 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.database; + +import java.io.Serializable; +import java.util.Arrays; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.internal.processors.cache.database.tree.BPlusTree; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public abstract class IgniteDbAbstractTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** + * @return Node count. + */ + protected abstract int gridCount(); + + /** + * @return {@code True} if indexing is enabled. + */ + protected abstract boolean indexingEnabled(); + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + MemoryConfiguration dbCfg = new MemoryConfiguration(); + + dbCfg.setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 4); + + if (isLargePage()) + dbCfg.setPageSize(16 * 1024); + else + dbCfg.setPageSize(1024); + + dbCfg.setPageCacheSize(200 * 1024 * 1024); + + configure(dbCfg); + + cfg.setMemoryConfiguration(dbCfg); + + CacheConfiguration ccfg = new CacheConfiguration(); + + if (indexingEnabled()) + ccfg.setIndexedTypes(Integer.class, DbValue.class); + + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + ccfg.setAffinity(new RendezvousAffinityFunction(false, 32)); + + CacheConfiguration ccfg2 = new CacheConfiguration("non-primitive"); + + if (indexingEnabled()) + ccfg2.setIndexedTypes(DbKey.class, DbValue.class); + + ccfg2.setAtomicityMode(TRANSACTIONAL); + ccfg2.setWriteSynchronizationMode(FULL_SYNC); + ccfg2.setRebalanceMode(SYNC); + ccfg2.setAffinity(new RendezvousAffinityFunction(false, 32)); + + CacheConfiguration ccfg3 = new CacheConfiguration("large"); + + if (indexingEnabled()) + ccfg3.setIndexedTypes(Integer.class, LargeDbValue.class); + + ccfg3.setAtomicityMode(TRANSACTIONAL); + ccfg3.setWriteSynchronizationMode(FULL_SYNC); + ccfg3.setRebalanceMode(SYNC); + ccfg3.setAffinity(new RendezvousAffinityFunction(false, 32)); + + CacheConfiguration ccfg4 = new CacheConfiguration("tiny"); + + ccfg4.setAtomicityMode(TRANSACTIONAL); + ccfg4.setWriteSynchronizationMode(FULL_SYNC); + ccfg4.setRebalanceMode(SYNC); + ccfg4.setAffinity(new RendezvousAffinityFunction(1, null)); + + CacheConfiguration ccfg5 = new CacheConfiguration("atomic"); + + if (indexingEnabled()) + ccfg5.setIndexedTypes(DbKey.class, DbValue.class); + + ccfg5.setAtomicityMode(ATOMIC); + ccfg5.setWriteSynchronizationMode(FULL_SYNC); + ccfg5.setRebalanceMode(SYNC); + ccfg5.setAffinity(new RendezvousAffinityFunction(false, 32)); + + cfg.setCacheConfiguration(ccfg, ccfg2, ccfg3, ccfg4, ccfg5); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + cfg.setMarshaller(null); + + configure(cfg); + + return cfg; + } + + /** + * @param cfg IgniteConfiguration. + */ + protected void configure(IgniteConfiguration cfg){ + // No-op. + } + + /** + * @param mCfg MemoryConfiguration. + */ + protected void configure(MemoryConfiguration mCfg){ + // No-op. + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); + +// long seed = System.currentTimeMillis(); +// +// info("Seed: " + seed + "L"); +// +// BPlusTree.rnd = new Random(seed); + + startGrids(gridCount()); + + awaitPartitionMapExchange(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + BPlusTree.rnd = null; + + stopAllGrids(); + + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); + } + + /** + * @return {@code True} if use large page. + */ + protected boolean isLargePage() { + return false; + } + + /** + * + */ + static class DbKey implements Serializable { + /** */ + int val; + + /** + * @param val Value. + */ + DbKey(int val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || !(o instanceof DbKey)) + return false; + + DbKey key = (DbKey)o; + + return val == key.val; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return val; + } + } + + /** + * + */ + static class LargeDbKey implements Serializable { + /** */ + int val; + + /** */ + byte[] data; + + /** + * @param val Value. + * @param size Key payload size. + */ + LargeDbKey(int val, int size) { + this.val = val; + + data = new byte[size]; + + Arrays.fill(data, (byte)val); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || !(o instanceof LargeDbKey)) + return false; + + LargeDbKey key = (LargeDbKey)o; + + return val == key.val && Arrays.equals(data, key.data); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return val + Arrays.hashCode(data); + } + } + + /** + * + */ + static class DbValue implements Serializable { + /** */ + @QuerySqlField(index = true) + int iVal; + + /** */ + @QuerySqlField(index = true) + String sVal; + + /** */ + @QuerySqlField + long lVal; + + /** + * @param iVal Integer value. + * @param sVal String value. + * @param lVal Long value. + */ + DbValue(int iVal, String sVal, long lVal) { + this.iVal = iVal; + this.sVal = sVal; + this.lVal = lVal; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + DbValue dbVal = (DbValue)o; + + return iVal == dbVal.iVal && lVal == dbVal.lVal && + !(sVal != null ? !sVal.equals(dbVal.sVal) : dbVal.sVal != null); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = iVal; + + res = 31 * res + (sVal != null ? sVal.hashCode() : 0); + res = 31 * res + (int)(lVal ^ (lVal >>> 32)); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DbValue.class, this); + } + } + + /** + * + */ + static class LargeDbValue { + /** */ + @QuerySqlField(index = true) + String str1; + + /** */ + @QuerySqlField(index = true) + String str2; + + /** */ + int[] arr; + + /** + * @param str1 String 1. + * @param str2 String 2. + * @param arr Big array. + */ + LargeDbValue(final String str1, final String str2, final int[] arr) { + this.str1 = str1; + this.str2 = str2; + this.arr = arr; + } + + /** {@inheritDoc} */ + @Override public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + final LargeDbValue that = (LargeDbValue) o; + + if (str1 != null ? !str1.equals(that.str1) : that.str1 != null) return false; + if (str2 != null ? !str2.equals(that.str2) : that.str2 != null) return false; + + return Arrays.equals(arr, that.arr); + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = str1 != null ? str1.hashCode() : 0; + + res = 31 * res + (str2 != null ? str2.hashCode() : 0); + res = 31 * res + Arrays.hashCode(arr); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(LargeDbValue.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3cba3206/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakAbstractTest.java new file mode 100644 index 0000000..2e9d3f9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakAbstractTest.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.database; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.database.DataStructure; + +import static org.apache.ignite.IgniteSystemProperties.getInteger; + +/** + * Base class for memory leaks tests. + */ +public abstract class IgniteDbMemoryLeakAbstractTest extends IgniteDbAbstractTest { + /** */ + private static final int CONCURRENCY_LEVEL = 8; + + /** */ + private static final int MIN_PAGE_CACHE_SIZE = 1048576 * CONCURRENCY_LEVEL; + + /** */ + private volatile Exception ex; + + /** */ + private long warmUpEndTime; + + /** */ + private long endTime; + + /** */ + private long loadedPages; + + /** */ + private long delta; + + /** */ + private long probeCnt; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + DataStructure.rnd = null; + + long startTime = System.nanoTime(); + + warmUpEndTime = startTime + TimeUnit.SECONDS.toNanos(warmUp()); + + endTime = warmUpEndTime + TimeUnit.SECONDS.toNanos(duration()); + } + + /** {@inheritDoc} */ + @Override protected void configure(IgniteConfiguration cfg) { + cfg.setMetricsLogFrequency(5000); + } + + /** {@inheritDoc} */ + @Override protected void configure(MemoryConfiguration mCfg) { + mCfg.setConcurrencyLevel(CONCURRENCY_LEVEL); + + long size = (1024 * (isLargePage() ? 16 : 1) + 24) * pagesMax(); + + mCfg.setPageCacheSize(Math.max(size, MIN_PAGE_CACHE_SIZE)); + } + + /** + * @return Test duration in seconds. + */ + protected int duration() { + return getInteger("IGNITE_MEMORY_LEAKS_TEST_DURATION", 300); + } + + /** + * @return Warm up duration in seconds. + */ + @SuppressWarnings("WeakerAccess") + protected int warmUp() { + return getInteger("IGNITE_MEMORY_LEAKS_TEST_WARM_UP", 450); + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected boolean indexingEnabled() { + return false; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return (warmUp() + duration() + 10) * 1000; // Extra seconds to stop all threads. + } + + /** + * @param ig Ignite instance. + * @return IgniteCache. + */ + protected abstract IgniteCache<Object, Object> cache(IgniteEx ig); + + /** + * @return Cache key to perform an operation. + */ + protected abstract Object key(); + + /** + * @param key Cache key to perform an operation. + * @return Cache value to perform an operation. + */ + protected abstract Object value(Object key); + + /** + * @param cache IgniteCache. + */ + protected void operation(IgniteCache<Object, Object> cache) { + Object key = key(); + Object val = value(key); + + switch (nextInt(3)) { + case 0: + cache.getAndPut(key, val); + + break; + + case 1: + cache.get(key); + + break; + + case 2: + cache.getAndRemove(key); + } + } + + /** + * @param bound Upper bound (exclusive). Must be positive. + * @return Random int value. + */ + protected static int nextInt(int bound) { + return ThreadLocalRandom.current().nextInt(bound); + } + + /** + * @return Random int value. + */ + protected static int nextInt() { + return ThreadLocalRandom.current().nextInt(); + } + + /** + * @throws Exception If failed. + */ + public void testMemoryLeak() throws Exception { + final IgniteEx ignite = grid(0); + + final IgniteCache<Object, Object> cache = cache(ignite); + + Runnable target = new Runnable() { + @Override public void run() { + while (ex == null && System.nanoTime() < endTime) { + try { + operation(cache); + } + catch (Exception e) { + ex = e; + + break; + } + } + } + }; + + Thread[] threads = new Thread[CONCURRENCY_LEVEL]; + + info("Warming up is started."); + + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(target); + threads[i].start(); + } + + while (ex == null && System.nanoTime() < warmUpEndTime) + Thread.sleep(100); + + if (ex != null) + throw ex; + + info("Warming up is ended."); + + while (ex == null && System.nanoTime() < endTime) { + try { + check(ignite); + } + catch (Exception e) { + ex = e; + + break; + } + + Thread.sleep(TimeUnit.SECONDS.toMillis(5)); + } + + if (ex != null) + throw ex; + } + + /** + * Callback to check the current state. + * + * @param ig Ignite instance. + * @throws Exception If failed. + */ + protected void check(IgniteEx ig) throws Exception { + long pagesActual = ig.context().cache().context().database().pageMemory().loadedPages(); + + if (loadedPages > 0) { + delta += pagesActual - loadedPages; + + int allowedDelta = pagesDelta(); + + if (probeCnt++ > 12) { // We need some statistic first. Minimal statistic is taken for a minute. + long actualDelta = delta / probeCnt; + + assertTrue( + "Average growth pages in the number is more than expected [allowed=" + allowedDelta + ", actual=" + actualDelta + "]", + actualDelta <= allowedDelta); + } + } + + loadedPages = pagesActual; + } + + /** + * @return Maximal allowed pages number. + */ + protected abstract long pagesMax(); + + /** + * @return Expected average number of pages, on which their total number can grow per 5 seconds. + */ + @SuppressWarnings("WeakerAccess") + protected int pagesDelta() { + return 3; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3cba3206/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakLargeObjectsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakLargeObjectsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakLargeObjectsTest.java new file mode 100644 index 0000000..077a1e1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakLargeObjectsTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.database; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.internal.IgniteEx; + +/** + * + */ +public class IgniteDbMemoryLeakLargeObjectsTest extends IgniteDbMemoryLeakAbstractTest { + /** */ + private final static int[] ARRAY; + + static { + ARRAY = new int[1024]; + + for (int i = 0; i < ARRAY.length; i++) + ARRAY[i] = nextInt(); + } + + /** {@inheritDoc} */ + @Override protected IgniteCache<Object, Object> cache(IgniteEx ig) { + return ig.cache("large"); + } + + /** {@inheritDoc} */ + @Override protected Object key() { + return new LargeDbKey(nextInt(10_000), 1024); + } + + /** {@inheritDoc} */ + @Override protected Object value(Object key) { + return new LargeDbValue("test-value-1-" + nextInt(200), "test-value-2-" + nextInt(200), ARRAY); + } + + /** {@inheritDoc} */ + @Override protected long pagesMax() { + return 35_000; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3cba3206/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakLargePagesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakLargePagesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakLargePagesTest.java new file mode 100644 index 0000000..540681d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakLargePagesTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.database; + +/** + * + */ +public class IgniteDbMemoryLeakLargePagesTest extends IgniteDbMemoryLeakTest { + /** {@inheritDoc} */ + @Override protected boolean isLargePage() { + return true; + } + + /** {@inheritDoc} */ + @Override protected long pagesMax() { + return 4000; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3cba3206/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakNonTransactionalTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakNonTransactionalTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakNonTransactionalTest.java new file mode 100644 index 0000000..2a6293d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakNonTransactionalTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.database; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.internal.IgniteEx; + +/** + * + */ +public class IgniteDbMemoryLeakNonTransactionalTest extends IgniteDbMemoryLeakTest { + /** {@inheritDoc} */ + @Override protected IgniteCache<Object, Object> cache(IgniteEx ig) { + return ig.cache("atomic"); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3cba3206/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakTest.java new file mode 100644 index 0000000..b8ac8f0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.database; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.internal.IgniteEx; + +/** + * + */ +public class IgniteDbMemoryLeakTest extends IgniteDbMemoryLeakAbstractTest { + /** {@inheritDoc} */ + @Override protected IgniteCache<Object, Object> cache(IgniteEx ig) { + return ig.cache("non-primitive"); + } + + /** {@inheritDoc} */ + @Override protected Object key() { + return new DbKey(nextInt(200_000)); + } + + /** {@inheritDoc} */ + @Override protected Object value(Object key) { + return new DbValue(((DbKey)key).val, "test-value-" + nextInt(200), nextInt(500)); + } + + /** {@inheritDoc} */ + @Override protected long pagesMax() { + return 20_000; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3cba3206/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakWithExpirationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakWithExpirationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakWithExpirationTest.java new file mode 100644 index 0000000..6e0abaf --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakWithExpirationTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.database; + +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; +import javax.cache.expiry.ExpiryPolicy; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.internal.IgniteEx; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +/** + * + */ +public class IgniteDbMemoryLeakWithExpirationTest extends IgniteDbMemoryLeakTest { + /** */ + private static final ExpiryPolicy EXPIRY = new CreatedExpiryPolicy(new Duration(MILLISECONDS, 10L)); + + /** {@inheritDoc} */ + @Override protected IgniteCache<Object, Object> cache(IgniteEx ig) { + return ig.cache("non-primitive").withExpiryPolicy(EXPIRY); + } + + /** {@inheritDoc} */ + @Override protected long pagesMax() { + return 7000; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3cba3206/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java index c7a07e3..12b0126 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.processors.database; -import java.io.Serializable; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -32,160 +30,24 @@ import javax.cache.Cache; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; -import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CachePeekMode; -import org.apache.ignite.cache.CacheRebalanceMode; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.affinity.Affinity; -import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; -import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; -import org.apache.ignite.cache.query.annotations.QuerySqlField; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.MemoryConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; -import org.apache.ignite.internal.processors.cache.database.tree.BPlusTree; import org.apache.ignite.internal.util.GridRandom; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Assert; /** * */ -public abstract class IgniteDbPutGetAbstractTest extends GridCommonAbstractTest { - /** */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** - * @return Node count. - */ - protected abstract int gridCount(); - - /** - * @return {@code True} if indexing is enabled. - */ - protected abstract boolean indexingEnabled(); - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - MemoryConfiguration dbCfg = new MemoryConfiguration(); - - if (isLargePage()) { - dbCfg.setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 4); - - dbCfg.setPageSize(16 * 1024); - - dbCfg.setPageCacheSize(200 * 1024 * 1024); - } - else { - dbCfg.setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 4); - - dbCfg.setPageSize(1024); - - dbCfg.setPageCacheSize(200 * 1024 * 1024); - } - - cfg.setMemoryConfiguration(dbCfg); - - CacheConfiguration ccfg = new CacheConfiguration(); - - if (indexingEnabled()) - ccfg.setIndexedTypes(Integer.class, DbValue.class); - - ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); - ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - ccfg.setRebalanceMode(CacheRebalanceMode.SYNC); - ccfg.setAffinity(new RendezvousAffinityFunction(false, 32)); - - CacheConfiguration ccfg2 = new CacheConfiguration("non-primitive"); - - if (indexingEnabled()) - ccfg2.setIndexedTypes(DbKey.class, DbValue.class); - - ccfg2.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); - ccfg2.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - ccfg2.setRebalanceMode(CacheRebalanceMode.SYNC); - ccfg2.setAffinity(new RendezvousAffinityFunction(false, 32)); - - CacheConfiguration ccfg3 = new CacheConfiguration("large"); - - if (indexingEnabled()) - ccfg3.setIndexedTypes(Integer.class, LargeDbValue.class); - - ccfg3.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); - ccfg3.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - ccfg3.setRebalanceMode(CacheRebalanceMode.SYNC); - ccfg3.setAffinity(new RendezvousAffinityFunction(false, 32)); - - CacheConfiguration ccfg4 = new CacheConfiguration("tiny"); - - ccfg4.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); - ccfg4.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - ccfg4.setRebalanceMode(CacheRebalanceMode.SYNC); - ccfg4.setAffinity(new RendezvousAffinityFunction(false, 32)); - - final AffinityFunction aff = new RendezvousAffinityFunction(1, null); - - ccfg4.setAffinity(aff); - - cfg.setCacheConfiguration(ccfg, ccfg2, ccfg3, ccfg4); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); - cfg.setMarshaller(null); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); - - long seed = 1464583813940L; // System.currentTimeMillis(); - - info("Seed: " + seed + "L"); - - BPlusTree.rnd = new Random(seed); - - startGrids(gridCount()); - - awaitPartitionMapExchange(); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - BPlusTree.rnd = null; - - stopAllGrids(); - - deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); - } - - /** - * @return {@code True} if use large page. - */ - protected boolean isLargePage() { - return false; - }; - +public abstract class IgniteDbPutGetAbstractTest extends IgniteDbAbstractTest { /** * */ @@ -196,9 +58,7 @@ public abstract class IgniteDbPutGetAbstractTest extends GridCommonAbstractTest final int cnt = 100_000; - Random rnd = BPlusTree.rnd; - - assert rnd != null; + Random rnd = new Random(); Map<Integer, DbValue> map = new HashMap<>(); @@ -1349,191 +1209,4 @@ public abstract class IgniteDbPutGetAbstractTest extends GridCommonAbstractTest assertNull(internalCache.peekEx(key)); } - - /** - * - */ - private static class DbKey implements Serializable { - /** */ - private int val; - - /** - * @param val Value. - */ - private DbKey(int val) { - this.val = val; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || !(o instanceof DbKey)) - return false; - - DbKey key = (DbKey)o; - - return val == key.val; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return val; - } - } - - /** - * - */ - private static class LargeDbKey implements Serializable { - /** */ - private int val; - - /** */ - private byte[] data; - - /** - * @param val Value. - * @param size Key payload size. - */ - private LargeDbKey(int val, int size) { - this.val = val; - - data = new byte[size]; - - Arrays.fill(data, (byte)val); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || !(o instanceof LargeDbKey)) - return false; - - LargeDbKey key = (LargeDbKey)o; - - return val == key.val && Arrays.equals(data, key.data); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return val + Arrays.hashCode(data); - } - } - - /** - * - */ - private static class DbValue implements Serializable { - /** */ - @QuerySqlField(index = true) - private int iVal; - - /** */ - @QuerySqlField(index = true) - private String sVal; - - /** */ - @QuerySqlField - private long lVal; - - /** - * @param iVal Integer value. - * @param sVal String value. - * @param lVal Long value. - */ - public DbValue(int iVal, String sVal, long lVal) { - this.iVal = iVal; - this.sVal = sVal; - this.lVal = lVal; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - DbValue dbVal = (DbValue)o; - - return iVal == dbVal.iVal && lVal == dbVal.lVal && - !(sVal != null ? !sVal.equals(dbVal.sVal) : dbVal.sVal != null); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = iVal; - - res = 31 * res + (sVal != null ? sVal.hashCode() : 0); - res = 31 * res + (int)(lVal ^ (lVal >>> 32)); - - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(DbValue.class, this); - } - } - - /** - * - */ - private static class LargeDbValue { - /** */ - @QuerySqlField(index = true) - private String str1; - - /** */ - @QuerySqlField(index = true) - private String str2; - - /** */ - private int[] arr; - - /** - * @param str1 String 1. - * @param str2 String 2. - * @param arr Big array. - */ - public LargeDbValue(final String str1, final String str2, final int[] arr) { - this.str1 = str1; - this.str2 = str2; - this.arr = arr; - } - - /** {@inheritDoc} */ - @Override public boolean equals(final Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - final LargeDbValue that = (LargeDbValue) o; - - if (str1 != null ? !str1.equals(that.str1) : that.str1 != null) return false; - if (str2 != null ? !str2.equals(that.str2) : that.str2 != null) return false; - - return Arrays.equals(arr, that.arr); - - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = str1 != null ? str1.hashCode() : 0; - - res = 31 * res + (str2 != null ? str2.hashCode() : 0); - res = 31 * res + Arrays.hashCode(arr); - - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(LargeDbValue.class, this); - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3cba3206/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteDbMemoryLeakTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteDbMemoryLeakTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteDbMemoryLeakTestSuite.java new file mode 100644 index 0000000..f271bd8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteDbMemoryLeakTestSuite.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testsuites; + +import junit.framework.TestSuite; +import org.apache.ignite.cache.LargeEntryUpdateTest; +import org.apache.ignite.internal.processors.database.IgniteDbMemoryLeakLargeObjectsTest; +import org.apache.ignite.internal.processors.database.IgniteDbMemoryLeakLargePagesTest; +import org.apache.ignite.internal.processors.database.IgniteDbMemoryLeakNonTransactionalTest; +import org.apache.ignite.internal.processors.database.IgniteDbMemoryLeakTest; +import org.apache.ignite.internal.processors.database.IgniteDbMemoryLeakWithExpirationTest; + +/** + * Page memory leaks tests. + */ +public class IgniteDbMemoryLeakTestSuite extends TestSuite { + /** + * @return Test suite. + * @throws Exception Thrown in case of the failure. + */ + public static TestSuite suite() throws Exception { + TestSuite suite = new TestSuite("Ignite Db Memory Leaks Test Suite"); + + suite.addTestSuite(IgniteDbMemoryLeakTest.class); + suite.addTestSuite(IgniteDbMemoryLeakWithExpirationTest.class); + suite.addTestSuite(IgniteDbMemoryLeakLargePagesTest.class); + suite.addTestSuite(IgniteDbMemoryLeakLargeObjectsTest.class); + suite.addTestSuite(IgniteDbMemoryLeakNonTransactionalTest.class); + + suite.addTestSuite(LargeEntryUpdateTest.class); + + return suite; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3cba3206/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakIndexedTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakIndexedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakIndexedTest.java new file mode 100644 index 0000000..f6a06c9 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakIndexedTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.database; + +/** + * + */ +public class IgniteDbMemoryLeakIndexedTest extends IgniteDbMemoryLeakTest { + /** {@inheritDoc} */ + @Override protected boolean indexingEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected long pagesMax() { + return 24_000; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3cba3206/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakSqlQueryTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakSqlQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakSqlQueryTest.java new file mode 100644 index 0000000..57f9fb5 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakSqlQueryTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.database; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.jetbrains.annotations.NotNull; + +/** + * + */ +public class IgniteDbMemoryLeakSqlQueryTest extends IgniteDbMemoryLeakTest { + /** {@inheritDoc} */ + @Override protected boolean indexingEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected long pagesMax() { + return 24_000; + } + + /** {@inheritDoc} */ + @Override protected void operation(IgniteCache<Object, Object> cache) { + Object key = key(); + Object val = value(key); + + switch (nextInt(4)) { + case 0: + cache.getAndPut(key, val); + + break; + + case 1: + cache.get(key); + + break; + + case 2: + cache.getAndRemove(key); + + break; + + case 3: + cache.query(sqlQuery(cache)).getAll(); + } + } + + /** + * @param cache IgniteCache. + * @return SqlFieldsQuery. + */ + @NotNull private SqlFieldsQuery sqlQuery(IgniteCache<Object, Object> cache) { + String qry = String.format("select _key from \"%s\".DbValue where iVal=?", cache.getName()); + + SqlFieldsQuery sqlQry = new SqlFieldsQuery(qry); + sqlQry.setArgs(nextInt(200_000)); + + return sqlQry; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3cba3206/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteDbMemoryLeakWithIndexingTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteDbMemoryLeakWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteDbMemoryLeakWithIndexingTestSuite.java new file mode 100644 index 0000000..36cd101 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteDbMemoryLeakWithIndexingTestSuite.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testsuites; + +import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.database.IgniteDbMemoryLeakIndexedTest; +import org.apache.ignite.internal.processors.database.IgniteDbMemoryLeakSqlQueryTest; + +/** + * Page memory leaks tests using indexing. + */ +public class IgniteDbMemoryLeakWithIndexingTestSuite extends TestSuite { + /** + * @return Test suite. + * @throws Exception Thrown in case of the failure. + */ + public static TestSuite suite() throws Exception { + TestSuite suite = new TestSuite("Ignite Db Memory Leaks With Indexing Test Suite"); + + suite.addTestSuite(IgniteDbMemoryLeakSqlQueryTest.class); + suite.addTestSuite(IgniteDbMemoryLeakIndexedTest.class); + + return suite; + } +}
