IGNITE-3867: Fixed ScanQuery ignores pageSize property. This closes #1406.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d0c0bcec Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d0c0bcec Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d0c0bcec Branch: refs/heads/master Commit: d0c0bcece7d8e9d373aaf13a210f6d890e5ad48b Parents: a922ac9 Author: Andrey V. Mashenkov <[email protected]> Authored: Tue Jan 17 16:19:02 2017 +0300 Committer: Andrey V. Mashenkov <[email protected]> Committed: Tue Jan 17 16:19:02 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 3 + .../IgniteCachePartitionedQuerySelfTest.java | 87 ++++++++++++++++++++ 2 files changed, 90 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d0c0bcec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index b9737c6..873c822 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -491,6 +491,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V qry = ctx.queries().createScanQuery(p, transformer, scanQry.getPartition(), isKeepBinary); + if (scanQry.getPageSize() > 0) + qry.pageSize(scanQry.getPageSize()); + if (grp != null) qry.projection(grp); http://git-wip-us.apache.org/repos/asf/ignite/blob/d0c0bcec/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java index 78fd914..b9f21da 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQuerySelfTest.java @@ -20,15 +20,28 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.util.Collection; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import javax.cache.Cache; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractQuerySelfTest; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.CommunicationSpi; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CachePeekMode.ALL; @@ -47,6 +60,11 @@ public class IgniteCachePartitionedQuerySelfTest extends IgniteCacheAbstractQuer return PARTITIONED; } + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + return super.getConfiguration(gridName).setCommunicationSpi(new TestTcpCommunicationSpi()); + } + /** * @throws Exception If failed. */ @@ -135,4 +153,73 @@ public class IgniteCachePartitionedQuerySelfTest extends IgniteCacheAbstractQuer assert F.asList(persons).contains(entry.getValue()); } } + + /** + * @throws Exception If failed. + */ + public void testScanQueryPagination() throws Exception { + final int pageSize = 5; + + final AtomicInteger pages = new AtomicInteger(0); + + IgniteCache<Integer, Integer> cache = ignite().cache(null); + + for (int i = 0; i < 50; i++) + cache.put(i, i); + + CommunicationSpi spi = ignite().configuration().getCommunicationSpi(); + + assert spi instanceof TestTcpCommunicationSpi; + + TestTcpCommunicationSpi commSpi = (TestTcpCommunicationSpi)spi; + + commSpi.filter = new IgniteInClosure<Message>() { + @Override public void apply(Message msg) { + if (!(msg instanceof GridIoMessage)) + return; + + Message msg0 = ((GridIoMessage)msg).message(); + + if (msg0 instanceof GridCacheQueryRequest) { + assertEquals(pageSize, ((GridCacheQueryRequest)msg0).pageSize()); + + pages.incrementAndGet(); + } + else if (msg0 instanceof GridCacheQueryResponse) { + assertTrue(((GridCacheQueryResponse)msg0).data().size() <= pageSize); + } + } + }; + + try { + ScanQuery<Integer, Integer> qry = new ScanQuery<Integer, Integer>(); + + qry.setPageSize(pageSize); + + List<Cache.Entry<Integer, Integer>> all = cache.query(qry).getAll(); + + assertTrue(pages.get() > ignite().cluster().forDataNodes(null).nodes().size()); + + assertEquals(50, all.size()); + } + finally { + commSpi.filter = null; + } + } + + /** + * + */ + private static class TestTcpCommunicationSpi extends TcpCommunicationSpi { + volatile IgniteInClosure<Message> filter; + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + throws IgniteSpiException { + if(filter != null) + filter.apply(msg); + + super.sendMessage(node, msg, ackClosure); + } + } } \ No newline at end of file
