IGNITE-6702: SQL: now COUNT(*) use BPlusTree.size instead of cursor iteration. This closes #3037.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/08ab9af8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/08ab9af8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/08ab9af8 Branch: refs/heads/ignite-zk Commit: 08ab9af8a94c53c1cd58d5763f2b9d594f3ca58e Parents: 8292335 Author: gg-shq <[email protected]> Authored: Mon Dec 4 15:25:00 2017 +0300 Committer: devozerov <[email protected]> Committed: Mon Dec 4 15:25:00 2017 +0300 ---------------------------------------------------------------------- .../internal/jdbc2/JdbcLocalCachesSelfTest.java | 28 + .../cache/persistence/tree/BPlusTree.java | 141 +++- .../processors/database/BPlusTreeSelfTest.java | 819 ++++++++++++++++++- .../query/h2/database/H2TreeIndex.java | 92 ++- ...lexClientAtomicPartitionedNoBackupsTest.java | 34 + ...exingComplexClientAtomicPartitionedTest.java | 2 +- ...dexingComplexClientAtomicReplicatedTest.java | 2 +- ...ntTransactionalPartitionedNoBackupsTest.java | 34 + ...mplexClientTransactionalPartitionedTest.java | 2 +- ...omplexClientTransactionalReplicatedTest.java | 2 +- ...lexServerAtomicPartitionedNoBackupsTest.java | 34 + ...exingComplexServerAtomicPartitionedTest.java | 2 +- ...dexingComplexServerAtomicReplicatedTest.java | 2 +- ...erTransactionalPartitionedNoBackupsTest.java | 34 + ...mplexServerTransactionalPartitionedTest.java | 2 +- ...omplexServerTransactionalReplicatedTest.java | 2 +- .../index/H2DynamicIndexingComplexTest.java | 22 +- .../query/IgniteSqlSegmentedIndexSelfTest.java | 124 ++- 18 files changed, 1288 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcLocalCachesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcLocalCachesSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcLocalCachesSelfTest.java index f096e69..09ccbc9 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcLocalCachesSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcLocalCachesSelfTest.java @@ -129,6 +129,34 @@ public class JdbcLocalCachesSelfTest extends GridCommonAbstractTest { } /** + * Verifies that <code>select count(*)</code> behaves correctly in + * {@link org.apache.ignite.cache.CacheMode#LOCAL} mode. + * + * @throws Exception If failed. + */ + public void testCountAll() throws Exception { + Properties cfg = new Properties(); + + cfg.setProperty(PROP_NODE_ID, grid(0).localNode().id().toString()); + + Connection conn = null; + + try { + conn = DriverManager.getConnection(BASE_URL, cfg); + + ResultSet rs = conn.createStatement().executeQuery("select count(*) from Integer"); + + assertTrue(rs.next()); + + assertEquals(2L, rs.getLong(1)); + } + finally { + if (conn != null) + conn.close(); + } + } + + /** * @throws Exception If failed. */ public void testCache2() throws Exception { http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java index 8e6e099..436a69d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java @@ -62,6 +62,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.Bool.DONE; import static org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.Bool.FALSE; @@ -1906,57 +1907,122 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements } /** - * !!! For debug only! May produce wrong results on concurrent access. + * Returns number of elements in the tree by scanning pages of the bottom (leaf) level. + * Since a concurrent access is permitted, there is no guarantee about + * momentary consistency: the method may miss updates made in already scanned pages. * - * @return Size. + * @return Number of elements in the tree. * @throws IgniteCheckedException If failed. */ @Override public final long size() throws IgniteCheckedException { + return size(null); + } + + /** + * Returns number of elements in the tree that match the filter by scanning through the pages of the leaf level. + * Since a concurrent access to the tree is permitted, there is no guarantee about + * momentary consistency: the method may not see updates made in already scanned pages. + * + * @param filter The filter to use or null to count all elements. + * @return Number of either all elements in the tree or the elements that match the filter. + * @throws IgniteCheckedException If failed. + */ + public long size(@Nullable TreeRowClosure<L, T> filter) throws IgniteCheckedException { checkDestroyed(); - long pageId; + for (;;) { + long curPageId; - long metaPage = acquirePage(metaPageId); - try { - pageId = getFirstPageId(metaPageId, metaPage, 0); // Level 0 is always at the bottom. - } - finally { - releasePage(metaPageId, metaPage); - } + long metaPage = acquirePage(metaPageId); - BPlusIO<L> io = null; + try { + curPageId = getFirstPageId(metaPageId, metaPage, 0); // Level 0 is always at the bottom. + } + finally { + releasePage(metaPageId, metaPage); + } - long cnt = 0; + long cnt = 0; - while (pageId != 0) { - long curId = pageId; - long curPage = acquirePage(curId); + long curPage = acquirePage(curPageId); try { - long curAddr = readLock(curId, curPage); // No correctness guaranties. + long curPageAddr = readLock(curPageId, curPage); + + if (curPageAddr == 0) + continue; // The first page has gone: restart scan. try { - if (io == null) { - io = io(curAddr); + BPlusIO<L> io = io(curPageAddr); - assert io.isLeaf(); - } + assert io.isLeaf(); + + for (;;) { + int curPageSize = io.getCount(curPageAddr); + + if (filter == null) + cnt += curPageSize; + else { + for (int i = 0; i < curPageSize; ++i) { + if (filter.apply(this, io, curPageAddr, i)) + cnt++; + } + } + + long nextPageId = io.getForward(curPageAddr); + + if (nextPageId == 0) { + checkDestroyed(); + + return cnt; + } + + long nextPage = acquirePage(nextPageId); + + try { + long nextPageAddr = readLock(nextPageId, nextPage); + + // In the current implementation the next page can't change when the current page is locked. + assert nextPageAddr != 0 : nextPageAddr; + + try { + long pa = curPageAddr; + curPageAddr = 0; // Set to zero to avoid double unlocking in finalizer. + + readUnlock(curPageId, curPage, pa); - cnt += io.getCount(curAddr); + long p = curPage; + curPage = 0; // Set to zero to avoid double release in finalizer. - pageId = io.getForward(curAddr); + releasePage(curPageId, p); + + curPageId = nextPageId; + curPage = nextPage; + curPageAddr = nextPageAddr; + + nextPage = 0; + nextPageAddr = 0; + } + finally { + if (nextPageAddr != 0) + readUnlock(nextPageId, nextPage, nextPageAddr); + } + } + finally { + if (nextPage != 0) + releasePage(nextPageId, nextPage); + } + } } finally { - readUnlock(curId, curPage, curAddr); + if (curPageAddr != 0) + readUnlock(curPageId, curPage, curPageAddr); } } finally { - releasePage(curId, curPage); + if (curPage != 0) + releasePage(curPageId, curPage); } } - - checkDestroyed(); - - return cnt; } /** @@ -4803,4 +4869,23 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements /** */ DONE } + + /** + * A generic visitor-style interface for performing filtering/modifications/miscellaneous operations on the tree. + */ + public interface TreeRowClosure<L, T extends L> { + /** + * Performs inspection or operation on a specified row and returns true if this row is + * required or matches or /operation successful (depending on the context). + * + * @param tree The tree. + * @param io Th tree IO object. + * @param pageAddr The page address. + * @param idx The item index. + * @return {@code True} if the item passes the predicate. + * @throws IgniteCheckedException If failed. + */ + public boolean apply(BPlusTree<L, T> tree, BPlusIO<L> io, long pageAddr, int idx) + throws IgniteCheckedException; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java index 7b4ca13..85d269f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java @@ -17,14 +17,22 @@ package org.apache.ignite.internal.processors.database; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.TreeMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -32,6 +40,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongArray; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; + +import com.google.common.base.Predicate; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; @@ -104,6 +114,9 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { /** */ private static int RMV_INC = 1; + /** Forces printing lock/unlock events on the test tree */ + private static boolean PRINT_LOCKS = false; + /** */ protected PageMemory pageMem; @@ -1077,10 +1090,6 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { for (long i = 15; i >= 0; i--) tree.put(i); - - - - } /** @@ -1156,6 +1165,790 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { } /** + * Verifies that {@link BPlusTree#size} and {@link BPlusTree#size} methods behave correctly + * on single-threaded addition and removal of elements in random order. + * + * @throws IgniteCheckedException If failed. + */ + public void testSizeForPutRmvSequential() throws IgniteCheckedException { + MAX_PER_PAGE = 5; + + boolean DEBUG_PRINT = false; + + int itemCnt = (int) Math.pow(MAX_PER_PAGE, 5) + rnd.nextInt(MAX_PER_PAGE * MAX_PER_PAGE); + + Long[] items = new Long[itemCnt]; + for (int i = 0; i < itemCnt; ++i) + items[i] = (long) i; + + TestTree testTree = createTestTree(true); + TreeMap<Long,Long> goldenMap = new TreeMap<>(); + + assertEquals(0, testTree.size()); + assertEquals(0, goldenMap.size()); + + final Predicate<Long> rowMatcher = new Predicate<Long>() { + @Override public boolean apply(Long row) { + return row % 7 == 0; + } + }; + + final BPlusTree.TreeRowClosure<Long, Long> rowClosure = new BPlusTree.TreeRowClosure<Long, Long>() { + @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx) + throws IgniteCheckedException { + return rowMatcher.apply(io.getLookupRow(tree, pageAddr, idx)); + } + }; + + int correctMatchingRows = 0; + + Collections.shuffle(Arrays.asList(items), rnd); + + for (Long row : items) { + if (DEBUG_PRINT) { + X.println(" --> put(" + row + ")"); + X.print(testTree.printTree()); + } + + assertEquals(goldenMap.put(row, row), testTree.put(row)); + assertEquals(row, testTree.findOne(row)); + + if (rowMatcher.apply(row)) + ++correctMatchingRows; + + assertEquals(correctMatchingRows, testTree.size(rowClosure)); + + long correctSize = goldenMap.size(); + + assertEquals(correctSize, testTree.size()); + assertEquals(correctSize, size(testTree.find(null, null))); + + assertNoLocks(); + } + + Collections.shuffle(Arrays.asList(items), rnd); + + for (Long row : items) { + if (DEBUG_PRINT) { + X.println(" --> rmv(" + row + ")"); + X.print(testTree.printTree()); + } + + assertEquals(row, goldenMap.remove(row)); + assertEquals(row, testTree.remove(row)); + assertNull(testTree.findOne(row)); + + if (rowMatcher.apply(row)) + --correctMatchingRows; + + assertEquals(correctMatchingRows, testTree.size(rowClosure)); + + long correctSize = goldenMap.size(); + + assertEquals(correctSize, testTree.size()); + assertEquals(correctSize, size(testTree.find(null, null))); + + assertNoLocks(); + } + } + + /** + * Verifies that {@link BPlusTree#size()} method behaves correctly when run concurrently with + * {@link BPlusTree#put}, {@link BPlusTree#remove} methods. Please see details in + * {@link #doTestSizeForRandomPutRmvMultithreaded}. + * + * @throws Exception If failed. + */ + public void testSizeForRandomPutRmvMultithreaded_5_4() throws Exception { + MAX_PER_PAGE = 5; + CNT = 10_000; + + doTestSizeForRandomPutRmvMultithreaded(4); + } + + public void testSizeForRandomPutRmvMultithreaded_3_256() throws Exception { + MAX_PER_PAGE = 3; + CNT = 10_000; + + doTestSizeForRandomPutRmvMultithreaded(256); + } + + /** + * Verifies that {@link BPlusTree#size()} method behaves correctly when run between series of + * concurrent {@link BPlusTree#put}, {@link BPlusTree#remove} methods. + * + * @param rmvPutSlidingWindowSize Sliding window size (distance between items being deleted and added). + * @throws Exception If failed. + */ + private void doTestSizeForRandomPutRmvMultithreaded(final int rmvPutSlidingWindowSize) throws Exception { + final TestTree tree = createTestTree(false); + + final boolean DEBUG_PRINT = false; + + final AtomicLong curRmvKey = new AtomicLong(0); + final AtomicLong curPutKey = new AtomicLong(rmvPutSlidingWindowSize); + + for (long i = curRmvKey.get(); i < curPutKey.get(); ++i) + assertNull(tree.put(i)); + + final int putRmvThreadCnt = Math.min(Runtime.getRuntime().availableProcessors(), rmvPutSlidingWindowSize); + + final int loopCnt = CNT / putRmvThreadCnt; + + final CyclicBarrier putRmvOpBarrier = new CyclicBarrier(putRmvThreadCnt); + final CyclicBarrier sizeOpBarrier = new CyclicBarrier(putRmvThreadCnt); + + IgniteInternalFuture<?> putRmvFut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + + for (int i = 0; i < loopCnt && !stop.get(); ++i) { + putRmvOpBarrier.await(); + + Long putVal = curPutKey.getAndIncrement(); + + if (DEBUG_PRINT || (i & 0x7ff) == 0) + X.println(" --> put(" + putVal + ")"); + + assertNull(tree.put(putVal)); + + assertNoLocks(); + + Long rmvVal = curRmvKey.getAndIncrement(); + + if (DEBUG_PRINT || (i & 0x7ff) == 0) + X.println(" --> rmv(" + rmvVal + ")"); + + assertEquals(rmvVal, tree.remove(rmvVal)); + assertNull(tree.remove(rmvVal)); + + assertNoLocks(); + + if (stop.get()) + break; + + sizeOpBarrier.await(); + + long correctSize = curPutKey.get() - curRmvKey.get(); + + if (DEBUG_PRINT || (i & 0x7ff) == 0) + X.println("====> correctSize=" + correctSize); + + assertEquals(correctSize, size(tree.find(null, null))); + assertEquals(correctSize, tree.size()); + } + + return null; + } + }, putRmvThreadCnt, "put-remove-size"); + + IgniteInternalFuture<?> lockPrintingFut = multithreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + while (!stop.get()) { + Thread.sleep(5000); + + X.println(TestTree.printLocks()); + } + + return null; + } + }, 1, "printLocks"); + + asyncRunFut = new GridCompoundFuture<>(); + + asyncRunFut.add((IgniteInternalFuture) putRmvFut); + asyncRunFut.add((IgniteInternalFuture) lockPrintingFut); + + asyncRunFut.markInitialized(); + + try { + putRmvFut.get(getTestTimeout(), TimeUnit.MILLISECONDS); + } + finally { + stop.set(true); + putRmvOpBarrier.reset(); + sizeOpBarrier.reset(); + + asyncRunFut.get(); + } + + tree.validateTree(); + + assertNoLocks(); + } + + /** + * Verifies that concurrent running of {@link BPlusTree#put} + {@link BPlusTree#remove} sequence + * and {@link BPlusTree#size} methods results in correct calculation of tree size. + * + * @see #doTestSizeForRandomPutRmvMultithreadedAsync doTestSizeForRandomPutRmvMultithreadedAsync() for details. + */ + public void testSizeForRandomPutRmvMultithreadedAsync_16() throws Exception { + doTestSizeForRandomPutRmvMultithreadedAsync(16); + } + + /** + * Verifies that concurrent running of {@link BPlusTree#put} + {@link BPlusTree#remove} sequence + * and {@link BPlusTree#size} methods results in correct calculation of tree size. + * + * @see #doTestSizeForRandomPutRmvMultithreadedAsync doTestSizeForRandomPutRmvMultithreadedAsync() for details. + */ + public void testSizeForRandomPutRmvMultithreadedAsync_3() throws Exception { + doTestSizeForRandomPutRmvMultithreadedAsync(3); + } + + /** + * Verifies that concurrent running of {@link BPlusTree#put} + {@link BPlusTree#remove} sequence + * and {@link BPlusTree#size} methods results in correct calculation of tree size. + * + * Since in the presence of concurrent modifications the size may differ from the actual one, the test maintains + * sliding window of records in the tree, uses a barrier between concurrent runs to limit runaway delta in + * the calculated size, and checks that the measured size lies within certain bounds. + * + * NB: This test has to be changed with the integration of IGNITE-3478. + * + */ + public void doTestSizeForRandomPutRmvMultithreadedAsync(final int rmvPutSlidingWindowSize) throws Exception { + MAX_PER_PAGE = 5; + + final boolean DEBUG_PRINT = false; + + final TestTree tree = createTestTree(false); + + final AtomicLong curRmvKey = new AtomicLong(0); + final AtomicLong curPutKey = new AtomicLong(rmvPutSlidingWindowSize); + + for (long i = curRmvKey.get(); i < curPutKey.get(); ++i) + assertNull(tree.put(i)); + + final int putRmvThreadCnt = Math.min(Runtime.getRuntime().availableProcessors(), rmvPutSlidingWindowSize); + final int sizeThreadCnt = putRmvThreadCnt; + + final CyclicBarrier putRmvOpBarrier = new CyclicBarrier(putRmvThreadCnt + sizeThreadCnt, new Runnable() { + @Override public void run() { + if (DEBUG_PRINT) { + try { + X.println("===BARRIER=== size=" + tree.size() + + "; contents=[" + tree.findFirst() + ".." + tree.findLast() + "]" + + "; rmvVal=" + curRmvKey.get() + "; putVal=" + curPutKey.get()); + + X.println(tree.printTree()); + } + catch (IgniteCheckedException e) { + // ignore + } + } + } + }); + + final int loopCnt = 500; + + IgniteInternalFuture<?> putRmvFut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < loopCnt && !stop.get(); ++i) { + int order; + try { + order = putRmvOpBarrier.await(); + } catch (BrokenBarrierException e) { + break; + } + + Long putVal = curPutKey.getAndIncrement(); + + if (DEBUG_PRINT || (i & 0x3ff) == 0) + X.println(order + ": --> put(" + putVal + ")"); + + assertNull(tree.put(putVal)); + + Long rmvVal = curRmvKey.getAndIncrement(); + + if (DEBUG_PRINT || (i & 0x3ff) == 0) + X.println(order + ": --> rmv(" + rmvVal + ")"); + + assertEquals(rmvVal, tree.remove(rmvVal)); + assertNull(tree.findOne(rmvVal)); + } + + return null; + } + }, putRmvThreadCnt, "put-remove"); + + IgniteInternalFuture<?> sizeFut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + + final List<Long> treeContents = new ArrayList<>(rmvPutSlidingWindowSize * 2); + + final BPlusTree.TreeRowClosure<Long, Long> rowDumper = new BPlusTree.TreeRowClosure<Long, Long>() { + @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx) + throws IgniteCheckedException { + + treeContents.add(io.getLookupRow(tree, pageAddr, idx)); + return true; + } + }; + + for (long iter = 0; !stop.get(); ++iter) { + int order = 0; + + try { + order = putRmvOpBarrier.await(); + } catch (BrokenBarrierException e) { + break; + } + + long correctSize = curPutKey.get() - curRmvKey.get(); + + treeContents.clear(); + long treeSize = tree.size(rowDumper); + + long minBound = correctSize - putRmvThreadCnt; + long maxBound = correctSize + putRmvThreadCnt; + + if (DEBUG_PRINT || (iter & 0x3ff) == 0) + X.println(order + ": size=" + treeSize + "; bounds=[" + minBound + ".." + maxBound + + "]; contents=" + treeContents); + + if (treeSize < minBound || treeSize > maxBound) { + fail("Tree size is not in bounds [" + minBound + ".." + maxBound + "]: " + treeSize + + "; Tree contents: " + treeContents); + } + } + + return null; + } + }, sizeThreadCnt, "size"); + + IgniteInternalFuture<?> lockPrintingFut = multithreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + while (!stop.get()) { + Thread.sleep(5000); + + X.println(TestTree.printLocks()); + } + + return null; + } + }, 1, "printLocks"); + + asyncRunFut = new GridCompoundFuture<>(); + + asyncRunFut.add((IgniteInternalFuture) putRmvFut); + asyncRunFut.add((IgniteInternalFuture) sizeFut); + asyncRunFut.add((IgniteInternalFuture) lockPrintingFut); + + asyncRunFut.markInitialized(); + + try { + putRmvFut.get(getTestTimeout(), TimeUnit.MILLISECONDS); + } + finally { + stop.set(true); + putRmvOpBarrier.reset(); + + asyncRunFut.get(); + } + + tree.validateTree(); + + assertNoLocks(); + } + + /** + * The test forces {@link BPlusTree#size} method to run into a livelock: during single run + * the method is picking up new pages which are concurrently added to the tree until the new pages are not added + * anymore. Test verifies that despite livelock condition a size from a valid range is returned. + * + * NB: This test has to be changed with the integration of IGNITE-3478. + * + * @throws Exception if test failed + */ + public void testPutSizeLivelock() throws Exception { + MAX_PER_PAGE = 5; + CNT = 800; + + final int SLIDING_WINDOW_SIZE = 16; + final boolean DEBUG_PRINT = false; + + final TestTree tree = createTestTree(false); + + final AtomicLong curRmvKey = new AtomicLong(0); + final AtomicLong curPutKey = new AtomicLong(SLIDING_WINDOW_SIZE); + + for (long i = curRmvKey.get(); i < curPutKey.get(); ++i) + assertNull(tree.put(i)); + + final int hwThreads = Runtime.getRuntime().availableProcessors(); + final int putRmvThreadCnt = Math.max(1, hwThreads / 2); + final int sizeThreadCnt = hwThreads - putRmvThreadCnt; + + final CyclicBarrier putRmvOpBarrier = new CyclicBarrier(putRmvThreadCnt, new Runnable() { + @Override public void run() { + if (DEBUG_PRINT) { + try { + X.println("===BARRIER=== size=" + tree.size() + + " [" + tree.findFirst() + ".." + tree.findLast() + "]"); + } + catch (IgniteCheckedException e) { + // ignore + } + } + } + }); + + final int loopCnt = CNT / hwThreads; + + IgniteInternalFuture<?> putRmvFut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < loopCnt && !stop.get(); ++i) { + int order; + try { + order = putRmvOpBarrier.await(); + } catch (BrokenBarrierException e) { + // barrier reset() has been called: terminate + break; + } + + Long putVal = curPutKey.getAndIncrement(); + + if ((i & 0xff) == 0) + X.println(order + ": --> put(" + putVal + ")"); + + assertNull(tree.put(putVal)); + + Long rmvVal = curRmvKey.getAndIncrement(); + + if ((i & 0xff) == 0) + X.println(order + ": --> rmv(" + rmvVal + ")"); + + assertEquals(rmvVal, tree.remove(rmvVal)); + assertNull(tree.findOne(rmvVal)); + } + + return null; + } + }, putRmvThreadCnt, "put-remove"); + + IgniteInternalFuture<?> sizeFut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + + final List<Long> treeContents = new ArrayList<>(SLIDING_WINDOW_SIZE * 2); + + final BPlusTree.TreeRowClosure<Long, Long> rowDumper = new BPlusTree.TreeRowClosure<Long, Long>() { + @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx) + throws IgniteCheckedException { + + treeContents.add(io.getLookupRow(tree, pageAddr, idx)); + + final long endMs = System.currentTimeMillis() + 10; + final long endPutKey = curPutKey.get() + MAX_PER_PAGE; + + while (System.currentTimeMillis() < endMs && curPutKey.get() < endPutKey) + Thread.yield(); + + return true; + } + }; + + while (!stop.get()) { + treeContents.clear(); + + long treeSize = tree.size(rowDumper); + long curPutVal = curPutKey.get(); + + X.println(" ======> size=" + treeSize + "; last-put-value=" + curPutVal); + + if (treeSize < SLIDING_WINDOW_SIZE || treeSize > curPutVal) + fail("Tree size is not in bounds [" + SLIDING_WINDOW_SIZE + ".." + curPutVal + "]:" + + treeSize + "; contents=" + treeContents); + } + + return null; + } + }, sizeThreadCnt, "size"); + + asyncRunFut = new GridCompoundFuture<>(); + + asyncRunFut.add((IgniteInternalFuture) putRmvFut); + asyncRunFut.add((IgniteInternalFuture) sizeFut); + + asyncRunFut.markInitialized(); + + try { + putRmvFut.get(getTestTimeout(), TimeUnit.MILLISECONDS); + } + finally { + stop.set(true); + putRmvOpBarrier.reset(); + + asyncRunFut.get(); + } + + tree.validateTree(); + + assertNoLocks(); + } + + /** + * Verifies that in case for threads concurrently calling put and remove + * on a tree with 1-3 pages, the size() method performs correctly. + * + * @throws Exception If failed. + */ + public void testPutRmvSizeSinglePageContention() throws Exception { + MAX_PER_PAGE = 10; + CNT = 20_000; + final boolean DEBUG_PRINT = false; + final int SLIDING_WINDOWS_SIZE = MAX_PER_PAGE * 2; + + final TestTree tree = createTestTree(false); + + final AtomicLong curPutKey = new AtomicLong(0); + final BlockingQueue<Long> rowsToRemove = new ArrayBlockingQueue<>(MAX_PER_PAGE / 2); + + final int hwThreadCnt = Runtime.getRuntime().availableProcessors(); + final int putThreadCnt = Math.max(1, hwThreadCnt / 4); + final int rmvThreadCnt = Math.max(1, hwThreadCnt / 2 - putThreadCnt); + final int sizeThreadCnt = Math.max(1, hwThreadCnt - putThreadCnt - rmvThreadCnt); + + final AtomicInteger sizeInvokeCnt = new AtomicInteger(0); + + final int loopCnt = CNT; + + IgniteInternalFuture<?> sizeFut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + int iter = 0; + while (!stop.get()) { + long size = tree.size(); + + if (DEBUG_PRINT || (++iter & 0xffff) == 0) + X.println(" --> size() = " + size); + + sizeInvokeCnt.incrementAndGet(); + } + + return null; + } + }, sizeThreadCnt, "size"); + + // Let the size threads ignite + while (sizeInvokeCnt.get() < sizeThreadCnt * 2) + Thread.yield(); + + IgniteInternalFuture<?> rmvFut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + int iter = 0; + while(!stop.get()) { + Long rmvVal = rowsToRemove.poll(200, TimeUnit.MILLISECONDS); + if (rmvVal != null) + assertEquals(rmvVal, tree.remove(rmvVal)); + + if (DEBUG_PRINT || (++iter & 0x3ff) == 0) + X.println(" --> rmv(" + rmvVal + ")"); + } + + return null; + } + }, rmvThreadCnt, "rmv"); + + IgniteInternalFuture<?> putFut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < loopCnt && !stop.get(); ++i) { + Long putVal = curPutKey.getAndIncrement(); + assertNull(tree.put(putVal)); + + while (rowsToRemove.size() > SLIDING_WINDOWS_SIZE && !stop.get()) + Thread.yield(); + + rowsToRemove.put(putVal); + + if (DEBUG_PRINT || (i & 0x3ff) == 0) + X.println(" --> put(" + putVal + ")"); + } + + return null; + } + }, putThreadCnt, "put"); + + IgniteInternalFuture<?> treePrintFut = multithreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + while (!stop.get()) { + Thread.sleep(1000); + + X.println(TestTree.printLocks()); + X.println(tree.printTree()); + } + + return null; + } + }, 1, "printTree"); + + asyncRunFut = new GridCompoundFuture<>(); + + asyncRunFut.add((IgniteInternalFuture) sizeFut); + asyncRunFut.add((IgniteInternalFuture) rmvFut); + asyncRunFut.add((IgniteInternalFuture) putFut); + asyncRunFut.add((IgniteInternalFuture) treePrintFut); + + asyncRunFut.markInitialized(); + + try { + putFut.get(getTestTimeout(), TimeUnit.MILLISECONDS); + } + finally { + stop.set(true); + + asyncRunFut.get(); + } + + tree.validateTree(); + + assertNoLocks(); + } + + /** + * The test verifies that {@link BPlusTree#put}, {@link BPlusTree#remove}, {@link BPlusTree#find}, and + * {@link BPlusTree#size} run concurrently, perform correctly and report correct values. + * + * A sliding window of numbers is maintainted in the tests. + * + * NB: This test has to be changed with the integration of IGNITE-3478. + * + * @throws Exception If failed. + */ + public void testPutRmvFindSizeMultithreaded() throws Exception { + MAX_PER_PAGE = 5; + CNT = 60_000; + + final int SLIDING_WINDOW_SIZE = 100; + + final TestTree tree = createTestTree(false); + + final AtomicLong curPutKey = new AtomicLong(0); + final BlockingQueue<Long> rowsToRemove = new ArrayBlockingQueue<>(SLIDING_WINDOW_SIZE); + + final int hwThreadCnt = Runtime.getRuntime().availableProcessors(); + final int putThreadCnt = Math.max(1, hwThreadCnt / 4); + final int rmvThreadCnt = Math.max(1, hwThreadCnt / 4); + final int findThreadCnt = Math.max(1, hwThreadCnt / 4); + final int sizeThreadCnt = Math.max(1, hwThreadCnt - putThreadCnt - rmvThreadCnt - findThreadCnt); + + final AtomicInteger sizeInvokeCnt = new AtomicInteger(0); + + final int loopCnt = CNT; + + IgniteInternalFuture<?> sizeFut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + int iter = 0; + while (!stop.get()) { + long size = tree.size(); + + if ((++iter & 0x3ff) == 0) + X.println(" --> size() = " + size); + + sizeInvokeCnt.incrementAndGet(); + } + + return null; + } + }, sizeThreadCnt, "size"); + + // Let the size threads start + while (sizeInvokeCnt.get() < sizeThreadCnt * 2) + Thread.yield(); + + IgniteInternalFuture<?> rmvFut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + int iter = 0; + while(!stop.get()) { + Long rmvVal = rowsToRemove.poll(200, TimeUnit.MILLISECONDS); + if (rmvVal != null) + assertEquals(rmvVal, tree.remove(rmvVal)); + + if ((++iter & 0x3ff) == 0) + X.println(" --> rmv(" + rmvVal + ")"); + } + + return null; + } + }, rmvThreadCnt, "rmv"); + + IgniteInternalFuture<?> findFut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + int iter = 0; + while(!stop.get()) { + Long findVal = curPutKey.get() + + SLIDING_WINDOW_SIZE / 2 + - rnd.nextInt(SLIDING_WINDOW_SIZE * 2); + + tree.findOne(findVal); + + if ((++iter & 0x3ff) == 0) + X.println(" --> fnd(" + findVal + ")"); + } + + return null; + } + }, findThreadCnt, "find"); + + IgniteInternalFuture<?> putFut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < loopCnt && !stop.get(); ++i) { + Long putVal = curPutKey.getAndIncrement(); + assertNull(tree.put(putVal)); + + while (rowsToRemove.size() > SLIDING_WINDOW_SIZE) { + if (stop.get()) + return null; + + Thread.yield(); + } + + rowsToRemove.put(putVal); + + if ((i & 0x3ff) == 0) + X.println(" --> put(" + putVal + ")"); + } + + return null; + } + }, putThreadCnt, "put"); + + IgniteInternalFuture<?> lockPrintingFut = multithreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + while (!stop.get()) { + Thread.sleep(1000); + + X.println(TestTree.printLocks()); + } + + return null; + } + }, 1, "printLocks"); + + asyncRunFut = new GridCompoundFuture<>(); + + asyncRunFut.add((IgniteInternalFuture) sizeFut); + asyncRunFut.add((IgniteInternalFuture) rmvFut); + asyncRunFut.add((IgniteInternalFuture) findFut); + asyncRunFut.add((IgniteInternalFuture) putFut); + asyncRunFut.add((IgniteInternalFuture) lockPrintingFut); + + asyncRunFut.markInitialized(); + + try { + putFut.get(getTestTimeout(), TimeUnit.MILLISECONDS); + } + finally { + stop.set(true); + + asyncRunFut.get(); + } + + tree.validateTree(); + + assertNoLocks(); + } + + /** * @throws Exception If failed. */ public void testTestRandomPutRemoveMultithreaded_1_30_0() throws Exception { @@ -1620,7 +2413,8 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override public void onBeforeReadLock(int cacheId, long pageId, long page) { -// X.println(" onBeforeReadLock: " + U.hexLong(page.id())); + if (PRINT_LOCKS) + X.println(" onBeforeReadLock: " + U.hexLong(pageId)); // // U.dumpStack(); @@ -1629,7 +2423,8 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override public void onReadLock(int cacheId, long pageId, long page, long pageAddr) { -// X.println(" onReadLock: " + U.hexLong(page.id())); + if (PRINT_LOCKS) + X.println(" onReadLock: " + U.hexLong(pageId)); if (pageAddr != 0L) { long actual = PageIO.getPageId(pageAddr); @@ -1644,7 +2439,8 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override public void onReadUnlock(int cacheId, long pageId, long page, long pageAddr) { -// X.println(" onReadUnlock: " + U.hexLong(page.id())); + if (PRINT_LOCKS) + X.println(" onReadUnlock: " + U.hexLong(pageId)); checkPageId(pageId, pageAddr); @@ -1655,14 +2451,16 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override public void onBeforeWriteLock(int cacheId, long pageId, long page) { -// X.println(" onBeforeWriteLock: " + U.hexLong(page.id())); + if (PRINT_LOCKS) + X.println(" onBeforeWriteLock: " + U.hexLong(pageId)); assertNull(beforeWriteLock.put(threadId(), pageId)); } /** {@inheritDoc} */ @Override public void onWriteLock(int cacheId, long pageId, long page, long pageAddr) { -// X.println(" onWriteLock: " + U.hexLong(page.id())); + if (PRINT_LOCKS) + X.println(" onWriteLock: " + U.hexLong(pageId)); // // U.dumpStack(); @@ -1682,7 +2480,8 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override public void onWriteUnlock(int cacheId, long pageId, long page, long pageAddr) { -// X.println(" onWriteUnlock: " + U.hexLong(page.id())); + if (PRINT_LOCKS) + X.println(" onWriteUnlock: " + U.hexLong(pageId)); assertEquals(effectivePageId(pageId), effectivePageId(PageIO.getPageId(pageAddr))); http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java index 4ebac88..5a336c5 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java @@ -23,11 +23,14 @@ import java.util.List; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.persistence.RootPage; import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.query.h2.H2Cursor; +import org.apache.ignite.internal.processors.query.h2.database.io.H2RowLinkIO; import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; @@ -165,20 +168,13 @@ public class H2TreeIndex extends GridH2IndexBase { /** {@inheritDoc} */ @Override public Cursor find(Session ses, SearchRow lower, SearchRow upper) { try { - IndexingQueryFilter f = threadLocalFilter(); - IndexingQueryCacheFilter p = null; - - if (f != null) { - String cacheName = getTable().cacheName(); - - p = f.forCache(cacheName); - } + IndexingQueryCacheFilter filter = partitionFilter(threadLocalFilter()); int seg = threadLocalSegment(); H2Tree tree = treeForRead(seg); - return new H2Cursor(tree.find(lower, upper, p)); + return new H2Cursor(tree.find(lower, upper, filter)); } catch (IgniteCheckedException e) { throw DbException.convert(e); @@ -274,14 +270,59 @@ public class H2TreeIndex extends GridH2IndexBase { /** {@inheritDoc} */ @Override public long getRowCount(Session ses) { - Cursor cursor = find(ses, null, null); + try { + int seg = threadLocalSegment(); - long res = 0; + H2Tree tree = treeForRead(seg); - while (cursor.next()) - res++; + BPlusTree.TreeRowClosure<SearchRow, GridH2Row> filter = filter(); - return res; + return tree.size(filter); + } + catch (IgniteCheckedException e) { + throw DbException.convert(e); + } + } + + /** + * An adapter from {@link IndexingQueryCacheFilter} to {@link BPlusTree.TreeRowClosure} to + * filter entries that belong to the current partition. + */ + private static class PartitionFilterTreeRowClosure implements BPlusTree.TreeRowClosure<SearchRow, GridH2Row> { + private final IndexingQueryCacheFilter filter; + + /** + * Creates a {@link BPlusTree.TreeRowClosure} adapter based on the given partition filter. + * + * @param filter The partition filter. + */ + public PartitionFilterTreeRowClosure(IndexingQueryCacheFilter filter) { + this.filter = filter; + } + + /** {@inheritDoc} */ + @Override public boolean apply(BPlusTree<SearchRow, GridH2Row> tree, + BPlusIO<SearchRow> io, long pageAddr, int idx) throws IgniteCheckedException { + + H2RowLinkIO h2io = (H2RowLinkIO)io; + + return filter.applyPartition( + PageIdUtils.partId( + PageIdUtils.pageId( + h2io.getLink(pageAddr, idx)))); + } + } + + /** + * Returns a filter to apply to rows in the current index to obtain only the + * ones owned by the this cache. + * + * @return The filter, which returns true for rows owned by this cache. + */ + @Nullable private BPlusTree.TreeRowClosure<SearchRow, GridH2Row> filter() { + final IndexingQueryCacheFilter filter = partitionFilter(threadLocalFilter()); + + return filter != null ? new PartitionFilterTreeRowClosure(filter) : null; } /** {@inheritDoc} */ @@ -344,13 +385,7 @@ public class H2TreeIndex extends GridH2IndexBase { @Nullable SearchRow last, IndexingQueryFilter filter) { try { - IndexingQueryCacheFilter p = null; - - if (filter != null) { - String cacheName = getTable().cacheName(); - - p = filter.forCache(cacheName); - } + IndexingQueryCacheFilter p = partitionFilter(filter); GridCursor<GridH2Row> range = t.find(first, last, p); @@ -365,6 +400,21 @@ public class H2TreeIndex extends GridH2IndexBase { } /** + * Filter which returns true for entries belonging to a particular partition. + * + * @param qryFilter Factory that creates a predicate for filtering entries for a particular cache. + * @return The filter or null if the filter is not needed (e.g., if the cache is not partitioned). + */ + @Nullable private IndexingQueryCacheFilter partitionFilter(@Nullable IndexingQueryFilter qryFilter) { + if (qryFilter == null) + return null; + + String cacheName = getTable().cacheName(); + + return qryFilter.forCache(cacheName); + } + + /** * @param inlineIdxs Inline index helpers. * @param cfgInlineSize Inline size from cache config. * @return Inline size. http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicPartitionedNoBackupsTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicPartitionedNoBackupsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicPartitionedNoBackupsTest.java new file mode 100644 index 0000000..25be1ed --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicPartitionedNoBackupsTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** + * Test to check work of DML+DDL operations of atomic partitioned cache without backups + * with queries initiated from client node. + */ +public class H2DynamicIndexingComplexClientAtomicPartitionedNoBackupsTest extends H2DynamicIndexingComplexTest { + /** + * Constructor. + */ + public H2DynamicIndexingComplexClientAtomicPartitionedNoBackupsTest() { + super(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 0, CLIENT_IDX); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicPartitionedTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicPartitionedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicPartitionedTest.java index 78eddbf..a05390f 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicPartitionedTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicPartitionedTest.java @@ -28,6 +28,6 @@ public class H2DynamicIndexingComplexClientAtomicPartitionedTest extends H2Dynam * Constructor. */ public H2DynamicIndexingComplexClientAtomicPartitionedTest() { - super(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, CLIENT_IDX); + super(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 1, CLIENT_IDX); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicReplicatedTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicReplicatedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicReplicatedTest.java index 0e1004c..6962eff 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicReplicatedTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicReplicatedTest.java @@ -28,6 +28,6 @@ public class H2DynamicIndexingComplexClientAtomicReplicatedTest extends H2Dynami * Constructor. */ public H2DynamicIndexingComplexClientAtomicReplicatedTest() { - super(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, CLIENT_IDX); + super(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, 1, CLIENT_IDX); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalPartitionedNoBackupsTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalPartitionedNoBackupsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalPartitionedNoBackupsTest.java new file mode 100644 index 0000000..bccb38e --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalPartitionedNoBackupsTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** + * Test to check work of DML+DDL operations of transactional partitioned cache without backups + * with queries initiated from client node. + */ +public class H2DynamicIndexingComplexClientTransactionalPartitionedNoBackupsTest extends H2DynamicIndexingComplexTest { + /** + * Constructor. + */ + public H2DynamicIndexingComplexClientTransactionalPartitionedNoBackupsTest() { + super(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 0, CLIENT_IDX); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalPartitionedTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalPartitionedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalPartitionedTest.java index 6dead30..8ec73cf 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalPartitionedTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalPartitionedTest.java @@ -28,6 +28,6 @@ public class H2DynamicIndexingComplexClientTransactionalPartitionedTest extends * Constructor. */ public H2DynamicIndexingComplexClientTransactionalPartitionedTest() { - super(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, CLIENT_IDX); + super(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 1, CLIENT_IDX); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalReplicatedTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalReplicatedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalReplicatedTest.java index 3c73d2c..6000277 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalReplicatedTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalReplicatedTest.java @@ -28,6 +28,6 @@ public class H2DynamicIndexingComplexClientTransactionalReplicatedTest extends H * Constructor. */ public H2DynamicIndexingComplexClientTransactionalReplicatedTest() { - super(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL, CLIENT_IDX); + super(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL, 1, CLIENT_IDX); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicPartitionedNoBackupsTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicPartitionedNoBackupsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicPartitionedNoBackupsTest.java new file mode 100644 index 0000000..6e806f9 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicPartitionedNoBackupsTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** + * Test to check work of DML+DDL operations of atomic partitioned cache without backups + * with queries initiated from server node. + */ +public class H2DynamicIndexingComplexServerAtomicPartitionedNoBackupsTest extends H2DynamicIndexingComplexTest { + /** + * Constructor. + */ + public H2DynamicIndexingComplexServerAtomicPartitionedNoBackupsTest() { + super(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 0, SRV_IDX); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicPartitionedTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicPartitionedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicPartitionedTest.java index ff0c1cb..18f4456 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicPartitionedTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicPartitionedTest.java @@ -28,6 +28,6 @@ public class H2DynamicIndexingComplexServerAtomicPartitionedTest extends H2Dynam * Constructor. */ public H2DynamicIndexingComplexServerAtomicPartitionedTest() { - super(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, SRV_IDX); + super(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 1, SRV_IDX); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicReplicatedTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicReplicatedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicReplicatedTest.java index 3d7ee18..2bfe678 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicReplicatedTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicReplicatedTest.java @@ -28,6 +28,6 @@ public class H2DynamicIndexingComplexServerAtomicReplicatedTest extends H2Dynami * Constructor. */ public H2DynamicIndexingComplexServerAtomicReplicatedTest() { - super(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, SRV_IDX); + super(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, 1, SRV_IDX); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalPartitionedNoBackupsTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalPartitionedNoBackupsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalPartitionedNoBackupsTest.java new file mode 100644 index 0000000..37b4489 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalPartitionedNoBackupsTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** + * Test to check work of DML+DDL operations of transactional partitioned cache without backups + * with queries initiated from server node. + */ +public class H2DynamicIndexingComplexServerTransactionalPartitionedNoBackupsTest extends H2DynamicIndexingComplexTest { + /** + * Constructor. + */ + public H2DynamicIndexingComplexServerTransactionalPartitionedNoBackupsTest() { + super(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 0, SRV_IDX); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalPartitionedTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalPartitionedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalPartitionedTest.java index aeb3839..85a58c1 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalPartitionedTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalPartitionedTest.java @@ -28,6 +28,6 @@ public class H2DynamicIndexingComplexServerTransactionalPartitionedTest extends * Constructor. */ public H2DynamicIndexingComplexServerTransactionalPartitionedTest() { - super(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, SRV_IDX); + super(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 1, SRV_IDX); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalReplicatedTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalReplicatedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalReplicatedTest.java index 4266161..54329b1 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalReplicatedTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalReplicatedTest.java @@ -28,6 +28,6 @@ public class H2DynamicIndexingComplexServerTransactionalReplicatedTest extends H * Constructor. */ public H2DynamicIndexingComplexServerTransactionalReplicatedTest() { - super(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL, SRV_IDX); + super(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL, 1, SRV_IDX); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexTest.java index f9d3408..68df58b 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexTest.java @@ -45,6 +45,9 @@ public abstract class H2DynamicIndexingComplexTest extends DynamicIndexAbstractS /** Node index to initiate operations from. */ private final int nodeIdx; + /** Backups to configure */ + private final int backups; + /** Names of companies to use. */ private final static List<String> COMPANIES = Arrays.asList("ASF", "GNU", "BSD"); @@ -61,11 +64,13 @@ public abstract class H2DynamicIndexingComplexTest extends DynamicIndexAbstractS * Constructor. * @param cacheMode Cache mode. * @param atomicityMode Cache atomicity mode. + * @param backups Number of backups. * @param nodeIdx Node index. */ - H2DynamicIndexingComplexTest(CacheMode cacheMode, CacheAtomicityMode atomicityMode, int nodeIdx) { + H2DynamicIndexingComplexTest(CacheMode cacheMode, CacheAtomicityMode atomicityMode, int backups, int nodeIdx) { this.cacheMode = cacheMode; this.atomicityMode = atomicityMode; + this.backups = backups; this.nodeIdx = nodeIdx; } @@ -94,12 +99,13 @@ public abstract class H2DynamicIndexingComplexTest extends DynamicIndexAbstractS public void testOperations() { executeSql("CREATE TABLE person (id int, name varchar, age int, company varchar, city varchar, " + "primary key (id, name, city)) WITH \"template=" + cacheMode.name() + ",atomicity=" + atomicityMode.name() + - ",affinity_key=city\""); + ",backups=" + backups + ",affinity_key=city\""); executeSql("CREATE INDEX idx on person (city asc, name asc)"); executeSql("CREATE TABLE city (name varchar, population int, primary key (name)) WITH " + - "\"template=" + cacheMode.name() + ",atomicity=" + atomicityMode.name() + ",affinity_key=name\""); + "\"template=" + cacheMode.name() + ",atomicity=" + atomicityMode.name() + + ",backups=" + backups + ",affinity_key=name\""); executeSql("INSERT INTO city (name, population) values(?, ?), (?, ?), (?, ?)", "St. Petersburg", 6000000, @@ -107,7 +113,9 @@ public abstract class H2DynamicIndexingComplexTest extends DynamicIndexAbstractS "London", 8000000 ); - for (int i = 0; i < 100; i++) + final long PERSON_COUNT = 100; + + for (int i = 0; i < PERSON_COUNT; i++) executeSql("INSERT INTO person (id, name, age, company, city) values (?, ?, ?, ?, ?)", i, "Person " + i, @@ -121,7 +129,11 @@ public abstract class H2DynamicIndexingComplexTest extends DynamicIndexAbstractS } }); - long r = (Long)executeSqlSingle("SELECT COUNT(*) from Person p inner join City c on p.city = c.name"); + long r = (Long)executeSqlSingle("SELECT COUNT(*) from Person"); + + assertEquals(PERSON_COUNT, r); + + r = (Long)executeSqlSingle("SELECT COUNT(*) from Person p inner join City c on p.city = c.name"); // Berkeley is not present in City table, although 25 people have it specified as their city. assertEquals(75L, r); http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java index 03c3f1e..dd03274 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java @@ -132,6 +132,8 @@ public class IgniteSqlSegmentedIndexSelfTest extends GridCommonAbstractTest { checkDistributedQueryWithSegmentedIndex(); checkLocalQueryWithSegmentedIndex(); + + checkLocalSizeQueryWithSegmentedIndex(); } /** @@ -144,18 +146,43 @@ public class IgniteSqlSegmentedIndexSelfTest extends GridCommonAbstractTest { IgniteCache<Object, Object> cache = ignite(0).cache(ORG_CACHE_NAME); // Unequal entries distribution among partitions. - int expectedSize = nodesCount() * QRY_PARALLELISM_LVL * 3 / 2; + int expSize = nodesCount() * QRY_PARALLELISM_LVL * 3 / 2; - for (int i = 0; i < expectedSize; i++) + for (int i = 0; i < expSize; i++) cache.put(i, new Organization("org-" + i)); String select0 = "select * from \"org\".Organization o"; // Check for stable results. for(int i = 0; i < 10; i++) { - List<List<?>> result = cache.query(new SqlFieldsQuery(select0)).getAll(); + List<List<?>> res = cache.query(new SqlFieldsQuery(select0)).getAll(); - assertEquals(expectedSize, result.size()); + assertEquals(expSize, res.size()); + } + } + + /** + * Checks correct <code>select count(*)</code> result with segmented indices. + * @throws Exception If failed. + */ + public void testSegmentedIndexSizeReproducableResults() throws Exception { + ignite(0).createCache(cacheConfig(ORG_CACHE_NAME, true, Integer.class, Organization.class)); + + IgniteCache<Object, Object> cache = ignite(0).cache(ORG_CACHE_NAME); + + // Unequal entries distribution among partitions. + long expSize = nodesCount() * QRY_PARALLELISM_LVL * 3 / 2; + + for (int i = 0; i < expSize; i++) + cache.put(i, new Organization("org-" + i)); + + String select0 = "select count(*) from \"org\".Organization o"; + + // Check for stable results. + for(int i = 0; i < 10; i++) { + List<List<?>> res = cache.query(new SqlFieldsQuery(select0)).getAll(); + + assertEquals(expSize, res.get(0).get(0)); } } @@ -170,14 +197,39 @@ public class IgniteSqlSegmentedIndexSelfTest extends GridCommonAbstractTest { .setEvictionPolicy(new FifoEvictionPolicy(10)) .setOnheapCacheEnabled(true)); - for (int i = 0; i < 20; i++) + final long SIZE = 20; + + for (int i = 0; i < SIZE; i++) cache.put(i, new Organization("org-" + i)); String select0 = "select name from \"org\".Organization"; - List<List<?>> result = cache.query(new SqlFieldsQuery(select0)).getAll(); + List<List<?>> res = cache.query(new SqlFieldsQuery(select0)).getAll(); - assertEquals(20, result.size()); + assertEquals(SIZE, res.size()); + } + + /** + * Verifies that <code>select count(*)</code> return valid result on a single-node grid. + * + * @throws Exception If failed. + */ + public void testSizeOnSegmentedIndexWithEvictionPolicy() throws Exception { + final IgniteCache<Object, Object> cache = ignite(0).createCache( + cacheConfig(ORG_CACHE_NAME, true, Integer.class, Organization.class) + .setEvictionPolicy(new FifoEvictionPolicy(10)) + .setOnheapCacheEnabled(true)); + + final long SIZE = 20; + + for (int i = 0; i < SIZE; i++) + cache.put(i, new Organization("org-" + i)); + + String select0 = "select count(*) from \"org\".Organization"; + + List<List<?>> res = cache.query(new SqlFieldsQuery(select0)).getAll(); + + assertEquals(SIZE, res.get(0).get(0)); } /** @@ -194,6 +246,8 @@ public class IgniteSqlSegmentedIndexSelfTest extends GridCommonAbstractTest { checkDistributedQueryWithSegmentedIndex(); checkLocalQueryWithSegmentedIndex(); + + checkLocalSizeQueryWithSegmentedIndex(); } /** @@ -205,21 +259,21 @@ public class IgniteSqlSegmentedIndexSelfTest extends GridCommonAbstractTest { for (int i = 0; i < nodesCount(); i++) { IgniteCache<Integer, Person> c1 = ignite(i).cache(PERSON_CAHE_NAME); - int expectedPersons = 0; + long expPersons = 0; for (Cache.Entry<Integer, Person> e : c1) { final Integer orgId = e.getValue().orgId; // We have as orphan ORG rows as orphan PERSON rows. if (ORPHAN_ROWS <= orgId && orgId < 500) - expectedPersons++; + expPersons++; } String select0 = "select o.name n1, p.name n2 from \"pers\".Person p, \"org\".Organization o where p.orgId = o._key"; - List<List<?>> result = c1.query(new SqlFieldsQuery(select0).setDistributedJoins(true)).getAll(); + List<List<?>> res = c1.query(new SqlFieldsQuery(select0).setDistributedJoins(true)).getAll(); - assertEquals(expectedPersons, result.size()); + assertEquals(expPersons, res.size()); } } @@ -235,25 +289,59 @@ public class IgniteSqlSegmentedIndexSelfTest extends GridCommonAbstractTest { IgniteCache<Integer, Person> c1 = node.cache(PERSON_CAHE_NAME); IgniteCache<Integer, Organization> c2 = node.cache(ORG_CACHE_NAME); - Set<Integer> localOrgIds = new HashSet<>(); + Set<Integer> locOrgIds = new HashSet<>(); for (Cache.Entry<Integer, Organization> e : c2.localEntries()) - localOrgIds.add(e.getKey()); + locOrgIds.add(e.getKey()); - int expectedPersons = 0; + long expPersons = 0; for (Cache.Entry<Integer, Person> e : c1.localEntries()) { final Integer orgId = e.getValue().orgId; - if (localOrgIds.contains(orgId)) - expectedPersons++; + if (locOrgIds.contains(orgId)) + expPersons++; } String select0 = "select o.name n1, p.name n2 from \"pers\".Person p, \"org\".Organization o where p.orgId = o._key"; - List<List<?>> result = c1.query(new SqlFieldsQuery(select0).setLocal(true)).getAll(); + List<List<?>> res = c1.query(new SqlFieldsQuery(select0).setLocal(true)).getAll(); + + assertEquals(expPersons, res.size()); + } + } + + /** + * Verifies that local <code>select count(*)</code> query returns a correct result. + * + * @throws Exception If failed. + */ + public void checkLocalSizeQueryWithSegmentedIndex() throws Exception { + for (int i = 0; i < nodesCount(); i++) { + final Ignite node = ignite(i); + + IgniteCache<Integer, Person> c1 = node.cache(PERSON_CAHE_NAME); + IgniteCache<Integer, Organization> c2 = node.cache(ORG_CACHE_NAME); + + Set<Integer> locOrgIds = new HashSet<>(); + + for (Cache.Entry<Integer, Organization> e : c2.localEntries()) + locOrgIds.add(e.getKey()); + + int expPersons = 0; + + for (Cache.Entry<Integer, Person> e : c1.localEntries()) { + final Integer orgId = e.getValue().orgId; + + if (locOrgIds.contains(orgId)) + expPersons++; + } + + String select0 = "select count(*) from \"pers\".Person p, \"org\".Organization o where p.orgId = o._key"; + + List<List<?>> res = c1.query(new SqlFieldsQuery(select0).setLocal(true)).getAll(); - assertEquals(expectedPersons, result.size()); + assertEquals((long) expPersons, res.get(0).get(0)); } }
