Repository: ignite Updated Branches: refs/heads/master c7aaee7ed -> e8aa03883
IGNITE-8286: ScanQuery ignore setLocal with non local partition. - Fixes #3871. Signed-off-by: shroman <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e8aa0388 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e8aa0388 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e8aa0388 Branch: refs/heads/master Commit: e8aa038835c235b76bb22580ca479dc679ba5365 Parents: c7aaee7 Author: shroman <[email protected]> Authored: Tue Sep 4 12:32:32 2018 +0900 Committer: shroman <[email protected]> Committed: Tue Sep 4 12:32:32 2018 +0900 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxyImpl.java | 3 +- .../cache/query/GridCacheQueryAdapter.java | 30 ++++++++-- .../cache/query/GridCacheQueryManager.java | 8 ++- ...CacheScanPartitionQueryFallbackSelfTest.java | 62 ++++++++++++++++++++ 4 files changed, 95 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e8aa0388/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java index c21ad0b..69ea562 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java @@ -379,7 +379,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< IgniteBiPredicate<K, V> p = scanQry.getFilter(); - final CacheQuery<R> qry = ctx.queries().createScanQuery(p, transformer, scanQry.getPartition(), isKeepBinary); + final CacheQuery<R> qry = ctx.queries().createScanQuery( + p, transformer, scanQry.getPartition(), isKeepBinary, scanQry.isLocal()); if (scanQry.getPageSize() > 0) qry.pageSize(scanQry.getPageSize()); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8aa0388/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index 07aea4c..0e3ab43 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -119,6 +119,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { /** */ private volatile boolean incBackups; + /** Local query. */ + private boolean forceLocal; + /** */ private volatile boolean dedup; @@ -143,13 +146,15 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { * @param filter Scan filter. * @param part Partition. * @param keepBinary Keep binary flag. + * @param forceLocal Flag to force local query. */ public GridCacheQueryAdapter(GridCacheContext<?, ?> cctx, GridCacheQueryType type, @Nullable IgniteBiPredicate<Object, Object> filter, @Nullable IgniteClosure<Map.Entry, Object> transform, @Nullable Integer part, - boolean keepBinary) { + boolean keepBinary, + boolean forceLocal) { assert cctx != null; assert type != null; assert part == null || part >= 0; @@ -160,6 +165,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { this.transform = transform; this.part = part; this.keepBinary = keepBinary; + this.forceLocal = forceLocal; log = cctx.logger(getClass()); @@ -313,6 +319,13 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { } /** + * @return {@code True} if the query is forced local. + */ + public boolean forceLocal() { + return forceLocal; + } + + /** * @return Security subject ID. */ public UUID subjectId() { @@ -516,15 +529,20 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { /** {@inheritDoc} */ @SuppressWarnings({"IfMayBeConditional", "unchecked"}) @Override public GridCloseableIterator executeScanQuery() throws IgniteCheckedException { - assert type == SCAN : "Wrong processing of qyery: " + type; + assert type == SCAN : "Wrong processing of query: " + type; // Affinity nodes snapshot. Collection<ClusterNode> nodes = new ArrayList<>(nodes()); cctx.checkSecurity(SecurityPermission.CACHE_READ); - if (nodes.isEmpty() && part == null) + if (nodes.isEmpty()) { + if (part != null && forceLocal) + throw new IgniteCheckedException("No queryable nodes for partition " + part + + " [forced local query=" + this + "]"); + return new GridEmptyCloseableIterator(); + } if (log.isDebugEnabled()) log.debug("Executing query [query=" + this + ", nodes=" + nodes + ']'); @@ -612,9 +630,10 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { * @param cctx Cache context. * @param prj Projection (optional). * @return Collection of data nodes in provided projection (if any). + * @throws IgniteCheckedException If partition number is invalid. */ private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx, - @Nullable final ClusterGroup prj, @Nullable final Integer part) { + @Nullable final ClusterGroup prj, @Nullable final Integer part) throws IgniteCheckedException { assert cctx != null; final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); @@ -624,6 +643,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { if (prj == null && part == null) return affNodes; + if (part != null && part >= cctx.affinity().partitions()) + throw new IgniteCheckedException("Invalid partition number: " + part); + final Set<ClusterNode> owners = part == null ? Collections.<ClusterNode>emptySet() : new HashSet<>(cctx.topology().owners(part, topVer)); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8aa0388/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index c209602..281400e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -2723,7 +2723,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte */ public <R> CacheQuery<R> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter, @Nullable Integer part, boolean keepBinary) { - return createScanQuery(filter, null, part, keepBinary); + return createScanQuery(filter, null, part, keepBinary, false); } /** @@ -2733,18 +2733,20 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @param trans Transformer. * @param part Partition. * @param keepBinary Keep binary flag. + * @param forceLocal Flag to force local scan. * @return Created query. */ public <T, R> CacheQuery<R> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter, @Nullable IgniteClosure<T, R> trans, - @Nullable Integer part, boolean keepBinary) { + @Nullable Integer part, boolean keepBinary, boolean forceLocal) { return new GridCacheQueryAdapter(cctx, SCAN, filter, trans, part, - keepBinary); + keepBinary, + forceLocal); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e8aa0388/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java index 999b1ad..3afcad8 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.cache.Cache; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; @@ -57,6 +58,7 @@ import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; /** @@ -149,6 +151,66 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT } /** + * Scan (with explicit {@code setLocal(true)}) should perform on the local node. + * + * @throws Exception If failed. + */ + public void testScanLocalExplicit() throws Exception { + cacheMode = CacheMode.PARTITIONED; + backups = 0; + commSpiFactory = new TestLocalCommunicationSpiFactory(); + + try { + Ignite ignite = startGrids(GRID_CNT); + + IgniteCacheProxy<Integer, Integer> cache = fillCache(ignite); + + int part = anyLocalPartition(cache.context()); + + QueryCursor<Cache.Entry<Integer, Integer>> qry = + cache.query(new ScanQuery<Integer, Integer>().setPartition(part).setLocal(true)); + + doTestScanQuery(qry, part); + + GridTestUtils.assertThrows(log, (Callable<Void>)() -> { + int remPart = remotePartition(cache.context()).getKey(); + + cache.query(new ScanQuery<Integer, Integer>().setPartition(remPart).setLocal(true)); + + return null; + }, IgniteCheckedException.class, null); + } + finally { + stopAllGrids(); + } + } + + /** + * Scan (with explicit {@code setLocal(true)}, no partition specified) should perform on the local node. + * + * @throws Exception If failed. + */ + public void testScanLocalExplicitNoPart() throws Exception { + cacheMode = CacheMode.PARTITIONED; + backups = 0; + commSpiFactory = new TestLocalCommunicationSpiFactory(); + + try { + Ignite ignite = startGrids(GRID_CNT); + + IgniteCacheProxy<Integer, Integer> cache = fillCache(ignite); + + QueryCursor<Cache.Entry<Integer, Integer>> qry = + cache.query(new ScanQuery<Integer, Integer>().setLocal(true)); + + assertFalse(qry.getAll().isEmpty()); + } + finally { + stopAllGrids(); + } + } + + /** * Scan should perform on the remote node. * * @throws Exception If failed.
