Repository: ignite Updated Branches: refs/heads/ignite-2.0 712398e12 -> 5ef610c07
IGNITE-4523 Allow distributed SQL query execution over explicit set of partitions - Fixes #1858. Signed-off-by: Sergi Vladykin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5ef610c0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5ef610c0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5ef610c0 Branch: refs/heads/ignite-2.0 Commit: 5ef610c07c947c7cf4884b946ef1649e5ce4da34 Parents: 712398e Author: ascherbakoff <[email protected]> Authored: Tue Apr 25 14:01:33 2017 +0300 Committer: Sergi Vladykin <[email protected]> Committed: Tue Apr 25 14:01:33 2017 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/cache/query/Query.java | 48 ++ .../ignite/cache/query/SqlFieldsQuery.java | 26 + .../org/apache/ignite/cache/query/SqlQuery.java | 26 + .../processors/cache/IgniteCacheProxy.java | 14 + .../processors/query/GridQueryProcessor.java | 4 +- .../ignite/internal/util/GridIntIterator.java | 33 + .../ignite/internal/util/GridIntList.java | 21 +- .../ignite/internal/util/IgniteUtils.java | 21 +- .../processors/query/h2/IgniteH2Indexing.java | 14 +- .../query/h2/twostep/GridMapQueryExecutor.java | 5 +- .../h2/twostep/GridReduceQueryExecutor.java | 222 ++++++- .../h2/twostep/msg/GridH2QueryRequest.java | 64 +- ...stributedPartitionQueryAbstractSelfTest.java | 655 +++++++++++++++++++ ...utedPartitionQueryConfigurationSelfTest.java | 92 +++ ...butedPartitionQueryNodeRestartsSelfTest.java | 114 ++++ ...eCacheDistributedPartitionQuerySelfTest.java | 90 +++ .../IgniteCacheQueryNodeRestartSelfTest2.java | 8 + .../IgniteCacheQuerySelfTestSuite.java | 6 + 18 files changed, 1419 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java b/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java index 71161e7..c9ed464 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java @@ -18,7 +18,10 @@ package org.apache.ignite.cache.query; import java.io.Serializable; +import java.util.Arrays; import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; /** @@ -93,6 +96,51 @@ public abstract class Query<R> implements Serializable { return this; } + /** + * Prepares the partitions. + * + * @param parts Partitions. + */ + protected int[] prepare(int[] parts) { + if (parts == null) + return null; + + A.notEmpty(parts, "Partitions"); + + boolean sorted = true; + + // Try to do validation in one pass, if array is already sorted. + for (int i = 0; i < parts.length; i++) { + if (i < parts.length - 1) + if (parts[i] > parts[i + 1]) + sorted = false; + else if (sorted) + validateDups(parts[i], parts[i + 1]); + + A.ensure(0 <= parts[i] && parts[i] < CacheConfiguration.MAX_PARTITIONS_COUNT, "Illegal partition"); + } + + // Sort and validate again. + if (!sorted) { + Arrays.sort(parts); + + for (int i = 0; i < parts.length; i++) { + if (i < parts.length - 1) + validateDups(parts[i], parts[i + 1]); + } + } + + return parts; + } + + /** + * @param p1 Part 1. + * @param p2 Part 2. + */ + private void validateDups(int p1, int p2) { + A.ensure(p1 != p2, "Partition duplicates are not allowed: " + p1); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(Query.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java index 8c3a4fe..9a7211b 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; /** * SQL Fields query. This query can return specific fields of data based @@ -70,6 +71,9 @@ public class SqlFieldsQuery extends Query<List<?>> { /** */ private boolean replicatedOnly; + /** Partitions for query */ + private int[] parts; + /** * Constructs SQL fields query. * @@ -261,6 +265,28 @@ public class SqlFieldsQuery extends Query<List<?>> { return replicatedOnly; } + /** + * Gets partitions for query, in ascending order. + */ + @Nullable public int[] getPartitions() { + return parts; + } + + /** + * Sets partitions for a query. + * The query will be executed only on nodes which are primary for specified partitions. + * <p> + * Note what passed array'll be sorted in place for performance reasons, if it wasn't sorted yet. + * + * @param parts Partitions. + * @return {@code this} for chaining. + */ + public SqlFieldsQuery setPartitions(@Nullable int... parts) { + this.parts = prepare(parts); + + return this; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(SqlFieldsQuery.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java index 944c70e..a5994b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; /** * SQL Query. @@ -56,6 +57,9 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> { /** */ private boolean replicatedOnly; + /** Partitions for query */ + private int[] parts; + /** * Constructs query for the given type name and SQL query. * @@ -250,6 +254,28 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> { return replicatedOnly; } + /** + * Gets partitions for query, in ascending order. + */ + @Nullable public int[] getPartitions() { + return parts; + } + + /** + * Sets partitions for a query. + * The query will be executed only on nodes which are primary for specified partitions. + * <p> + * Note what passed array'll be sorted in place for performance reasons, if it wasn't sorted yet. + * + * @param parts Partitions. + * @return {@code this} for chaining. + */ + public SqlQuery setPartitions(@Nullable int... parts) { + this.parts = prepare(parts); + + return this; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(SqlQuery.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index b38520d..dfe817e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -776,6 +776,13 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (qry instanceof SqlQuery) { final SqlQuery p = (SqlQuery)qry; + if (p.isReplicatedOnly() && p.getPartitions() != null) + throw new CacheException("Partitions are not supported in replicated only mode."); + + if (p.isDistributedJoins() && p.getPartitions() != null) + throw new CacheException( + "Using both partitions and distributed JOINs is not supported for the same query"); + if ((p.isReplicatedOnly() && isReplicatedDataNode()) || ctx.isLocal() || qry.isLocal()) return (QueryCursor<R>)ctx.kernalContext().query().queryLocal(ctx, p, opCtxCall != null && opCtxCall.isKeepBinary()); @@ -786,6 +793,13 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (qry instanceof SqlFieldsQuery) { SqlFieldsQuery p = (SqlFieldsQuery)qry; + if (p.isReplicatedOnly() && p.getPartitions() != null) + throw new CacheException("Partitions are not supported in replicated only mode."); + + if (p.isDistributedJoins() && p.getPartitions() != null) + throw new CacheException( + "Using both partitions and distributed JOINs is not supported for the same query"); + if ((p.isReplicatedOnly() && isReplicatedDataNode()) || ctx.isLocal() || qry.isLocal()) return (QueryCursor<R>)ctx.kernalContext().query().queryLocalFields(ctx, p); http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 015646d..448639b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -1754,7 +1754,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { qry.getArgs(), cctx.name()); - return idx.queryLocalSql(cctx, qry, idx.backupFilter(requestTopVer.get(), null), keepBinary); + return idx.queryLocalSql(cctx, qry, idx.backupFilter(requestTopVer.get(), qry.getPartitions()), keepBinary); } }, true); } @@ -1938,7 +1938,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { GridQueryCancel cancel = new GridQueryCancel(); final QueryCursor<List<?>> cursor = idx.queryLocalSqlFields(cctx, qry, - idx.backupFilter(requestTopVer.get(), null), cancel); + idx.backupFilter(requestTopVer.get(), qry.getPartitions()), cancel); return new QueryCursorImpl<List<?>>(new Iterable<List<?>>() { @Override public Iterator<List<?>> iterator() { http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntIterator.java new file mode 100644 index 0000000..ea863e7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntIterator.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +/** + * Iterator over integer primitives. + */ +public interface GridIntIterator { + /** + * @return {@code true} if the iteration has more elements. + */ + public boolean hasNext(); + + /** + * @return Next int. + */ + public int next(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntList.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntList.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntList.java index 968b88e..e5b7b1b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntList.java @@ -582,5 +582,22 @@ public class GridIntList implements Message, Externalizable { /** {@inheritDoc} */ @Override public byte fieldsCount() { return 2; - } -} + } + + /** + * @return Iterator. + */ + public GridIntIterator iterator() { + return new GridIntIterator() { + int c = 0; + + @Override public boolean hasNext() { + return c < idx; + } + + @Override public int next() { + return arr[c++]; + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 7d7d071..59d334a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -10094,4 +10094,23 @@ public abstract class IgniteUtils { throw new IgniteCheckedException(e); } } -} + + /** + * Returns {@link GridIntIterator} for range of primitive integers. + * @param start Start. + * @param cnt Count. + */ + public static GridIntIterator forRange(final int start, final int cnt) { + return new GridIntIterator() { + int c = 0; + + @Override public boolean hasNext() { + return c < cnt; + } + + @Override public int next() { + return start + c++; + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 798ca9b..361b55b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1471,6 +1471,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @param qry Query. * @param keepCacheObj Flag to keep cache object. * @param enforceJoinOrder Enforce join order of tables. + * @param parts Partitions. * @return Iterable result. */ private Iterable<List<?>> runQueryTwoStep( @@ -1480,11 +1481,12 @@ public class IgniteH2Indexing implements GridQueryIndexing { final boolean enforceJoinOrder, final int timeoutMillis, final GridQueryCancel cancel, - final Object[] params + final Object[] params, + final int[] parts ) { return new Iterable<List<?>>() { @Override public Iterator<List<?>> iterator() { - return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel, params); + return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel, params, parts); } }; } @@ -1515,6 +1517,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { fqry.setArgs(qry.getArgs()); fqry.setPageSize(qry.getPageSize()); fqry.setDistributedJoins(qry.isDistributedJoins()); + fqry.setPartitions(qry.getPartitions()); fqry.setLocal(qry.isLocal()); if (qry.getTimeout() > 0) @@ -1730,7 +1733,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { cancel = new GridQueryCancel(); QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>( - runQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), enforceJoinOrder, qry.getTimeout(), cancel, qry.getArgs()), + runQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), enforceJoinOrder, qry.getTimeout(), cancel, + qry.getArgs(), qry.getPartitions()), cancel); cursor.fieldsMeta(meta); @@ -1750,12 +1754,12 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (caches.isEmpty()) return; // Nothing to check - GridCacheSharedContext sharedContext = ctx.cache().context(); + GridCacheSharedContext sharedCtx = ctx.cache().context(); int expectedParallelism = 0; for (int i = 0; i < caches.size(); i++) { - GridCacheContext cctx = sharedContext.cacheContext(caches.get(i)); + GridCacheContext cctx = sharedCtx.cacheContext(caches.get(i)); assert cctx != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index e4347b5..45d8f50 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -426,8 +426,11 @@ public class GridMapQueryExecutor { * @param req Query request. */ private void onQueryRequest(final ClusterNode node, final GridH2QueryRequest req) throws IgniteCheckedException { + int[] qryParts = req.queryPartitions(); + final Map<UUID,int[]> partsMap = req.partitions(); - final int[] parts = partsMap == null ? null : partsMap.get(ctx.localNodeId()); + + final int[] parts = qryParts == null ? partsMap == null ? null : partsMap.get(ctx.localNodeId()) : qryParts; assert !F.isEmpty(req.caches()); http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index d307c00..3d81cb5 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.Arrays; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -73,6 +74,8 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQuery import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; +import org.apache.ignite.internal.util.GridIntIterator; +import org.apache.ignite.internal.util.GridIntList; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.typedef.CIX2; import org.apache.ignite.internal.util.typedef.F; @@ -80,6 +83,7 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiClosure; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.extensions.communication.Message; import org.h2.command.ddl.CreateTableData; import org.h2.engine.Session; @@ -113,6 +117,9 @@ public class GridReduceQueryExecutor { private static final String MERGE_INDEX_SORTED = "merge_sorted"; /** */ + private static final Set<ClusterNode> UNMAPPED_PARTS = Collections.emptySet(); + + /** */ private GridKernalContext ctx; /** */ @@ -376,21 +383,78 @@ public class GridReduceQueryExecutor { } /** + * @param topVer Topology version. + * @param cctx Cache context. + * @param parts Partitions. + */ + private Map<ClusterNode, IntArray> stableDataNodesMap(AffinityTopologyVersion topVer, + final GridCacheContext<?, ?> cctx, @Nullable final int[] parts) { + + Map<ClusterNode, IntArray> mapping = new HashMap<>(); + + // Explicit partitions mapping is not applicable to replicated cache. + if (cctx.isReplicated()) { + for (ClusterNode clusterNode : cctx.affinity().assignment(topVer).primaryPartitionNodes()) + mapping.put(clusterNode, null); + + return mapping; + } + + List<List<ClusterNode>> assignment = cctx.affinity().assignment(topVer).assignment(); + + boolean needPartsFilter = parts != null; + + GridIntIterator iter = needPartsFilter ? new GridIntList(parts).iterator() : + U.forRange(0, cctx.affinity().partitions()); + + while(iter.hasNext()) { + int partId = iter.next(); + + List<ClusterNode> partNodes = assignment.get(partId); + + if (partNodes.size() > 0) { + ClusterNode prim = partNodes.get(0); + + if (!needPartsFilter) { + mapping.put(prim, null); + + continue; + } + + IntArray partIds = mapping.get(prim); + + if (partIds == null) { + partIds = new IntArray(); + + mapping.put(prim, partIds); + } + + partIds.add(partId); + } + } + + return mapping; + } + + /** * @param isReplicatedOnly If we must only have replicated caches. * @param topVer Topology version. * @param cctx Cache context for main space. * @param extraSpaces Extra spaces. + * @param parts Partitions. * @return Data nodes or {@code null} if repartitioning started and we need to retry. */ - private Collection<ClusterNode> stableDataNodes( - boolean isReplicatedOnly, - AffinityTopologyVersion topVer, - final GridCacheContext<?, ?> cctx, - List<Integer> extraSpaces - ) { - Set<ClusterNode> nodes = new HashSet<>(cctx.affinity().assignment(topVer).primaryPartitionNodes()); + private Map<ClusterNode, IntArray> stableDataNodes( + boolean isReplicatedOnly, + AffinityTopologyVersion topVer, + final GridCacheContext<?, ?> cctx, + List<Integer> extraSpaces, + int[] parts) { + Map<ClusterNode, IntArray> map = stableDataNodesMap(topVer, cctx, parts); - if (F.isEmpty(nodes)) + Set<ClusterNode> nodes = map.keySet(); + + if (F.isEmpty(map)) throw new CacheException("Failed to find data nodes for cache: " + cctx.name()); if (!F.isEmpty(extraSpaces)) { @@ -406,7 +470,7 @@ public class GridReduceQueryExecutor { throw new CacheException("Queries running on replicated cache should not contain JOINs " + "with partitioned tables [rCache=" + cctx.name() + ", pCache=" + extraSpace + "]"); - Collection<ClusterNode> extraNodes = extraCctx.affinity().assignment(topVer).primaryPartitionNodes(); + Set<ClusterNode> extraNodes = stableDataNodesMap(topVer, extraCctx, parts).keySet(); if (F.isEmpty(extraNodes)) throw new CacheException("Failed to find data nodes for cache: " + extraSpace); @@ -414,7 +478,7 @@ public class GridReduceQueryExecutor { if (isReplicatedOnly && extraCctx.isReplicated()) { nodes.retainAll(extraNodes); - if (nodes.isEmpty()) { + if (map.isEmpty()) { if (isPreloadingActive(cctx, extraSpaces)) return null; // Retry. else @@ -431,7 +495,7 @@ public class GridReduceQueryExecutor { ", cache2=" + extraSpace + "]"); } else if (!isReplicatedOnly && !extraCctx.isReplicated()) { - if (extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes)) + if (!extraNodes.equals(nodes)) if (isPreloadingActive(cctx, extraSpaces)) return null; // Retry. else @@ -443,7 +507,7 @@ public class GridReduceQueryExecutor { } } - return nodes; + return map; } /** @@ -454,6 +518,7 @@ public class GridReduceQueryExecutor { * @param timeoutMillis Timeout in milliseconds. * @param cancel Query cancel. * @param params Query parameters. + * @param parts Partitions. * @return Rows iterator. */ public Iterator<List<?>> query( @@ -463,13 +528,17 @@ public class GridReduceQueryExecutor { boolean enforceJoinOrder, int timeoutMillis, GridQueryCancel cancel, - Object[] params + Object[] params, + final int[] parts ) { if (F.isEmpty(params)) params = EMPTY_PARAMS; final boolean isReplicatedOnly = qry.isReplicatedOnly(); + // Fail if all caches are replicated and explicit partitions are set. + + for (int attempt = 0;; attempt++) { if (attempt != 0) { try { @@ -494,11 +563,30 @@ public class GridReduceQueryExecutor { List<Integer> extraSpaces = qry.extraCaches(); - Collection<ClusterNode> nodes; + Collection<ClusterNode> nodes = null; // Explicit partition mapping for unstable topology. Map<ClusterNode, IntArray> partsMap = null; + // Explicit partitions mapping for query. + Map<ClusterNode, IntArray> qryMap = null; + + // Partitions are not supported for queries over all replicated caches. + if (cctx.isReplicated() && parts != null) { + boolean failIfReplicatedOnly = true; + + for (Integer cacheId : extraSpaces) { + if (!cacheContext(cacheId).isReplicated()) { + failIfReplicatedOnly = false; + + break; + } + } + + if (failIfReplicatedOnly) + throw new CacheException("Partitions are not supported for replicated caches"); + } + if (qry.isLocal()) nodes = singletonList(ctx.discovery().localNode()); else { @@ -508,11 +596,18 @@ public class GridReduceQueryExecutor { else { partsMap = partitionedUnstableDataNodes(cctx, extraSpaces); - nodes = partsMap == null ? null : partsMap.keySet(); + if (partsMap != null) { + qryMap = narrowForQuery(partsMap, parts); + + nodes = qryMap == null ? null : qryMap.keySet(); + } } + } else { + qryMap = stableDataNodes(isReplicatedOnly, topVer, cctx, extraSpaces, parts); + + if (qryMap != null) + nodes = qryMap.keySet(); } - else - nodes = stableDataNodes(isReplicatedOnly, topVer, cctx, extraSpaces); if (nodes == null) continue; // Retry. @@ -633,19 +728,18 @@ public class GridReduceQueryExecutor { if (send(nodes, new GridH2QueryRequest() - .requestId(qryReqId) - .topologyVersion(topVer) - .pageSize(r.pageSize) - .caches(qry.caches()) - .tables(distributedJoins ? qry.tables() : null) - .partitions(convert(partsMap)) - .queries(mapQrys) - .parameters(params) - .flags(flags) - .timeout(timeoutMillis), - null, - false)) { - + .requestId(qryReqId) + .topologyVersion(topVer) + .pageSize(r.pageSize) + .caches(qry.caches()) + .tables(distributedJoins ? qry.tables() : null) + .partitions(convert(partsMap)) + .queries(mapQrys) + .parameters(params) + .flags(flags) + .timeout(timeoutMillis), + parts == null ? null : new ExplicitPartitionsSpecializer(qryMap), + false)) { awaitAllReplies(r, nodes, cancel); Object state = r.state.get(); @@ -1034,7 +1128,13 @@ public class GridReduceQueryExecutor { List<ClusterNode> owners = cctx.topology().owners(p); if (F.isEmpty(owners)) { - if (!F.isEmpty(dataNodes(cctx.name(), NONE))) + // Handle special case: no mapping is configured for a partition. + if (F.isEmpty(cctx.affinity().assignment(NONE).get(p))) { + partLocs[p] = UNMAPPED_PARTS; // Mark unmapped partition. + + continue; + } + else if (!F.isEmpty(dataNodes(cctx.name(), NONE))) return null; // Retry. throw new CacheException("Failed to find data nodes [cache=" + cctx.name() + ", part=" + p + "]"); @@ -1059,6 +1159,9 @@ public class GridReduceQueryExecutor { for (int p = 0, parts = extraCctx.affinity().partitions(); p < parts; p++) { List<ClusterNode> owners = extraCctx.topology().owners(p); + if (partLocs[p] == UNMAPPED_PARTS) + continue; // Skip unmapped partitions. + if (F.isEmpty(owners)) { if (!F.isEmpty(dataNodes(extraCctx.name(), NONE))) return null; // Retry. @@ -1090,6 +1193,9 @@ public class GridReduceQueryExecutor { return null; // Retry. for (Set<ClusterNode> partLoc : partLocs) { + if (partLoc == UNMAPPED_PARTS) + continue; // Skip unmapped partition. + partLoc.retainAll(dataNodes); if (partLoc.isEmpty()) @@ -1105,6 +1211,10 @@ public class GridReduceQueryExecutor { for (int p = 0; p < partLocs.length; p++) { Set<ClusterNode> pl = partLocs[p]; + // Skip unmapped partitions. + if (pl == UNMAPPED_PARTS) + continue; + assert !F.isEmpty(pl) : pl; ClusterNode n = pl.size() == 1 ? F.first(pl) : F.rand(pl); @@ -1429,4 +1539,52 @@ public class GridReduceQueryExecutor { state(e, null); } } -} + + /** */ + private Map<ClusterNode, IntArray> narrowForQuery(Map<ClusterNode, IntArray> partsMap, int[] parts) { + if (parts == null) + return partsMap; + + Map<ClusterNode, IntArray> cp = U.newHashMap(partsMap.size()); + + for (Map.Entry<ClusterNode, IntArray> entry : partsMap.entrySet()) { + IntArray filtered = new IntArray(parts.length); + + IntArray orig = entry.getValue(); + + for (int i = 0; i < orig.size(); i++) { + int p = orig.get(i); + + if (Arrays.binarySearch(parts, p) >= 0) + filtered.add(p); + } + + if (filtered.size() > 0) + cp.put(entry.getKey(), filtered); + } + + return cp.isEmpty() ? null : cp; + } + + /** */ + private static class ExplicitPartitionsSpecializer implements IgniteBiClosure<ClusterNode, Message, Message> { + /** Partitions map. */ + private final Map<ClusterNode, IntArray> partsMap; + + /** + * @param partsMap Partitions map. + */ + public ExplicitPartitionsSpecializer(Map<ClusterNode, IntArray> partsMap) { + this.partsMap = partsMap; + } + + /** {@inheritDoc} */ + @Override public Message apply(ClusterNode node, Message msg) { + GridH2QueryRequest rq = new GridH2QueryRequest((GridH2QueryRequest)msg); + + rq.queryPartitions(toArray(partsMap.get(node))); + + return rq; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java index 9e7dcbf..6741d89 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep.msg; +import java.io.Externalizable; import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; @@ -92,6 +93,10 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { @GridDirectMap(keyType = UUID.class, valueType = int[].class) private Map<UUID, int[]> parts; + /** Query partitions. */ + @GridToStringInclude + private int[] qryParts; + /** */ private int pageSize; @@ -120,6 +125,32 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { private byte[] paramsBytes; /** + * Required by {@link Externalizable} + */ + public GridH2QueryRequest() { + // No-op. + } + + /** + * @param req Request. + * @return {@code this}. + */ + public GridH2QueryRequest(GridH2QueryRequest req) { + this.reqId = req.reqId; + this.caches = req.caches; + this.topVer = req.topVer; + this.parts = req.parts; + this.qryParts = req.qryParts; + this.pageSize = req.pageSize; + this.qrys = req.qrys; + this.flags = req.flags; + this.tbls = req.tbls; + this.timeout = req.timeout; + this.params = req.params; + this.paramsBytes = req.paramsBytes; + } + + /** * @return Parameters. */ public Object[] parameters() { @@ -225,6 +256,23 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { } /** + * @return Query partitions. + */ + public int[] queryPartitions() { + return qryParts; + } + + /** + * @param qryParts Query partitions. + * @return {@code this}. + */ + public GridH2QueryRequest queryPartitions(int[] qryParts) { + this.qryParts = qryParts; + + return this; + } + + /** * @param pageSize Page size. * @return {@code this}. */ @@ -403,6 +451,12 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { writer.incrementState(); + + case 10: + if (!writer.writeIntArray("qryParts", qryParts)) + return false; + + writer.incrementState(); } return true; @@ -496,6 +550,14 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); + + case 10: + qryParts = reader.readIntArray("qryParts"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); } return reader.afterMessageRead(GridH2QueryRequest.class); @@ -508,7 +570,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 10; + return 11; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryAbstractSelfTest.java new file mode 100644 index 0000000..708fb1d --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryAbstractSelfTest.java @@ -0,0 +1,655 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.UUID; +import javax.cache.Cache; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteException; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cache.affinity.AffinityKeyMapped; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.configuration.MemoryPolicyConfiguration; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.GridRandom; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.util.AttributeNodeFilter; +import org.jsr166.ThreadLocalRandom8; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Abstract test for queries over explicit partitions. + */ +public abstract class IgniteCacheDistributedPartitionQueryAbstractSelfTest extends GridCommonAbstractTest { + /** Join query for test. */ + private static final String JOIN_QRY = "select cl._KEY, de.depositId, de.regionId from " + + "\"cl\".Client cl, \"de\".Deposit de, \"re\".Region re where cl.clientId=de.clientId and de.regionId=re._KEY"; + + /** Region node attribute name. */ + private static final String REGION_ATTR_NAME = "reg"; + + /** Grids count. */ + protected static final int GRIDS_COUNT = 10; + + /** IP finder. */ + private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Partitions per region distribution. */ + protected static final int[] PARTS_PER_REGION = new int[] {10, 20, 30, 40, 24}; + + /** Unmapped region id. */ + protected static final int UNMAPPED_REGION = PARTS_PER_REGION.length; + + /** Clients per partition. */ + protected static final int CLIENTS_PER_PARTITION = 1; + + /** Total clients. */ + private static final int TOTAL_CLIENTS; + + /** Affinity function to use on partitioned caches. */ + private static final AffinityFunction AFFINITY = new RegionAwareAffinityFunction(); + + /** Partitions count. */ + private static final int PARTS_COUNT; + + /** Regions to partitions mapping. */ + protected static final NavigableMap<Integer, List<Integer>> REGION_TO_PART_MAP = new TreeMap<>(); + + /** Query threads count. */ + protected static final int QUERY_THREADS_CNT = 4; + + /** Restarting threads count. */ + protected static final int RESTART_THREADS_CNT = 2; + + /** Node stop time. */ + protected static final int NODE_RESTART_TIME = 1_000; + + static { + int total = 0, parts = 0, p = 0, regionId = 1; + + for (int regCnt : PARTS_PER_REGION) { + total += regCnt * CLIENTS_PER_PARTITION; + + parts += regCnt; + + REGION_TO_PART_MAP.put(regionId++, Arrays.asList(p, regCnt)); + + p += regCnt; + } + + /** Last region was left empty intentionally, see {@link #UNMAPPED_REGION} */ + TOTAL_CLIENTS = total - PARTS_PER_REGION[PARTS_PER_REGION.length - 1] * CLIENTS_PER_PARTITION; + + PARTS_COUNT = parts; + } + + /** Deposits per client. */ + public static final int DEPOSITS_PER_CLIENT = 10; + + /** Rnd. */ + protected GridRandom rnd = new GridRandom(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + MemoryConfiguration memCfg = new MemoryConfiguration(); + memCfg.setDefaultMemoryPolicyName("default"); + memCfg.setMemoryPolicies(new MemoryPolicyConfiguration().setName("default").setSize(20 * 1024 * 1024)); + + cfg.setMemoryConfiguration(memCfg); + + TcpDiscoverySpi spi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); + spi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(spi); + + /** Clients cache */ + CacheConfiguration<ClientKey, Client> clientCfg = new CacheConfiguration<>(); + clientCfg.setName("cl"); + clientCfg.setWriteSynchronizationMode(FULL_SYNC); + clientCfg.setAtomicityMode(TRANSACTIONAL); + clientCfg.setRebalanceMode(SYNC); + clientCfg.setBackups(2); + clientCfg.setAffinity(AFFINITY); + clientCfg.setIndexedTypes(ClientKey.class, Client.class); + + /** Deposits cache */ + CacheConfiguration<DepositKey, Deposit> depoCfg = new CacheConfiguration<>(); + depoCfg.setName("de"); + depoCfg.setWriteSynchronizationMode(FULL_SYNC); + depoCfg.setAtomicityMode(TRANSACTIONAL); + depoCfg.setRebalanceMode(SYNC); + depoCfg.setBackups(2); + depoCfg.setAffinity(AFFINITY); + depoCfg.setIndexedTypes(DepositKey.class, Deposit.class); + + /** Regions cache. Uses default affinity. */ + CacheConfiguration<Integer, Region> regionCfg = new CacheConfiguration<>(); + regionCfg.setName("re"); + regionCfg.setWriteSynchronizationMode(FULL_SYNC); + regionCfg.setAtomicityMode(TRANSACTIONAL); + regionCfg.setRebalanceMode(SYNC); + regionCfg.setCacheMode(CacheMode.REPLICATED); + regionCfg.setIndexedTypes(Integer.class, Region.class); + + cfg.setCacheConfiguration(clientCfg, depoCfg, regionCfg); + + if ("client".equals(gridName)) + cfg.setClientMode(true); + else { + Integer reg = regionForGrid(gridName); + + cfg.setUserAttributes(F.asMap(REGION_ATTR_NAME, reg)); + + log().info("Assigned region " + reg + " to grid " + gridName); + } + + return cfg; + } + + /** */ + private static final class RegionAwareAffinityFunction implements AffinityFunction { + /** {@inheritDoc} */ + @Override public void reset() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public int partitions() { + return PARTS_COUNT; + } + + /** {@inheritDoc} */ + @Override public int partition(Object key) { + Integer regionId; + + if (key instanceof RegionKey) + regionId = ((RegionKey)key).regionId; + else if (key instanceof BinaryObject) { + BinaryObject bo = (BinaryObject)key; + + regionId = bo.field("regionId"); + } + else + throw new IgniteException("Unsupported key for region aware affinity"); + + List<Integer> range = REGION_TO_PART_MAP.get(regionId); + + Integer cnt = range.get(1); + + return U.safeAbs(key.hashCode() % cnt) + range.get(0); // Assign partition in region's range. + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) { + List<ClusterNode> nodes = affCtx.currentTopologySnapshot(); + + List<List<ClusterNode>> assignment = new ArrayList<>(PARTS_COUNT); + + for (int p = 0; p < PARTS_COUNT; p++) { + // Get region for partition. + int regionId = regionForPart(p); + + // Filter all nodes for region. + AttributeNodeFilter f = new AttributeNodeFilter(REGION_ATTR_NAME, regionId); + + List<ClusterNode> regionNodes = new ArrayList<>(); + + for (ClusterNode node : nodes) + if (f.apply(node)) + regionNodes.add(node); + + final int cp = p; + + Collections.sort(regionNodes, new Comparator<ClusterNode>() { + @Override public int compare(ClusterNode o1, ClusterNode o2) { + return Long.compare(hash(cp, o1), hash(cp, o2)); + } + }); + + assignment.add(regionNodes); + } + + return assignment; + } + + /** {@inheritDoc} */ + @Override public void removeNode(UUID nodeId) { + // No-op. + } + + /** + * @param part Partition. + */ + protected int regionForPart(int part) { + for (Map.Entry<Integer, List<Integer>> entry : REGION_TO_PART_MAP.entrySet()) { + List<Integer> range = entry.getValue(); + + if (range.get(0) <= part && part < range.get(0) + range.get(1)) + return entry.getKey(); + } + + throw new IgniteException("Failed to find zone for partition"); + } + + /** + * @param part Partition. + * @param obj Object. + */ + private long hash(int part, Object obj) { + long x = ((long)part << 32) | obj.hashCode(); + x ^= x >>> 12; + x ^= x << 25; + x ^= x >>> 27; + return x * 2685821657736338717L; + } + } + + /** + * Assigns a region to grid part. + * + * @param gridName Grid name. + */ + protected Integer regionForGrid(String gridName) { + char c = gridName.charAt(gridName.length() - 1); + switch (c) { + case '0': + return 1; + case '1': + case '2': + return 2; + case '3': + case '4': + case '5': + return 3; + default: + return 4; + } + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + int sum1 = 0; + for (List<Integer> range : REGION_TO_PART_MAP.values()) + sum1 += range.get(1); + + assertEquals("Illegal partition per region distribution", PARTS_COUNT, sum1); + + startGridsMultiThreaded(GRIDS_COUNT); + + startGrid("client"); + + // Fill caches. + int clientId = 1; + int depositId = 1; + int regionId = 1; + int p = 1; // Percents counter. Log message will be printed 10 times. + + try (IgniteDataStreamer<ClientKey, Client> clStr = grid(0).dataStreamer("cl"); + IgniteDataStreamer<DepositKey, Deposit> depStr = grid(0).dataStreamer("de")) { + for (int cnt : PARTS_PER_REGION) { + // Last region was left empty intentionally. + if (regionId < PARTS_PER_REGION.length) { + for (int i = 0; i < cnt * CLIENTS_PER_PARTITION; i++) { + ClientKey ck = new ClientKey(clientId, regionId); + + Client cl = new Client(); + cl.firstName = "First_Name_" + clientId; + cl.lastName = "Last_Name_" + clientId; + cl.passport = clientId * 1_000; + + clStr.addData(ck, cl); + + for (int j = 0; j < DEPOSITS_PER_CLIENT; j++) { + DepositKey dk = new DepositKey(depositId++, new ClientKey(clientId, regionId)); + + Deposit depo = new Deposit(); + depo.amount = ThreadLocalRandom8.current().nextLong(1_000_001); + depStr.addData(dk, depo); + } + + if (clientId / (float)TOTAL_CLIENTS >= p / 10f) { + log().info("Loaded " + clientId + " of " + TOTAL_CLIENTS); + + p++; + } + + clientId++; + } + } + + Region region = new Region(); + region.name = "Region_" + regionId; + region.code = regionId * 10; + + grid(0).cache("re").put(regionId, region); + + regionId++; + } + } + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @param orig Originator. + */ + protected void doTestRegionQuery(Ignite orig) { + IgniteCache<ClientKey, Client> cl = orig.cache("cl"); + + for (int regionId = 1; regionId <= PARTS_PER_REGION.length; regionId++) { + SqlQuery<ClientKey, Client> qry1 = new SqlQuery<>(Client.class, "regionId=?"); + qry1.setArgs(regionId); + + List<Cache.Entry<ClientKey, Client>> clients1 = cl.query(qry1).getAll(); + + int expRegionCnt = regionId == 5 ? 0 : PARTS_PER_REGION[regionId - 1] * CLIENTS_PER_PARTITION; + + assertEquals("Region " + regionId + " count", expRegionCnt, clients1.size()); + + validateClients(regionId, clients1); + + // Repeat the same query with partition set condition. + List<Integer> range = REGION_TO_PART_MAP.get(regionId); + + SqlQuery<ClientKey, Client> qry2 = new SqlQuery<>(Client.class, "1=1"); + qry2.setPartitions(createRange(range.get(0), range.get(1))); + + try { + List<Cache.Entry<ClientKey, Client>> clients2 = cl.query(qry2).getAll(); + + assertEquals("Region " + regionId + " count with partition set", expRegionCnt, clients2.size()); + + // Query must produce only results from single region. + validateClients(regionId, clients2); + + if (regionId == UNMAPPED_REGION) + fail(); + } catch (CacheException ignored) { + if (regionId != UNMAPPED_REGION) + fail(); + } + } + } + + /** */ + protected int[] createRange(int start, int cnt) { + int[] vals = new int[cnt]; + + for (int i = 0; i < cnt; i++) + vals[i] = start + i; + + return vals; + } + + /** + * @param orig Originator. + */ + protected void doTestPartitionsQuery(Ignite orig) { + IgniteCache<ClientKey, Client> cl = orig.cache("cl"); + + for (int regionId = 1; regionId <= PARTS_PER_REGION.length; regionId++) { + log().info("Running test queries for region " + regionId); + + List<Integer> range = REGION_TO_PART_MAP.get(regionId); + + int[] parts = createRange(range.get(0), range.get(1)); + + int off = rnd.nextInt(parts.length); + + int p1 = parts[off], p2 = parts[(off + (1 + rnd.nextInt(parts.length-1))) % parts.length]; + + log().info("Parts: " + p1 + " " + p2); + + SqlQuery<ClientKey, Client> qry = new SqlQuery<>(Client.class, "1=1"); + + qry.setPartitions(p1, p2); + + try { + List<Cache.Entry<ClientKey, Client>> clients = cl.query(qry).getAll(); + + // Query must produce only results from two partitions. + for (Cache.Entry<ClientKey, Client> client : clients) { + int p = orig.affinity("cl").partition(client.getKey()); + + assertTrue("Incorrect partition for key", p == p1 || p == p2); + } + + if (regionId == UNMAPPED_REGION) + fail(); + } catch (CacheException ignored) { + if (regionId != UNMAPPED_REGION) + fail(); + } + } + } + + /** + * @param orig Query originator. + * @param regionIds Region ids. + */ + protected void doTestJoinQuery(Ignite orig, int... regionIds) { + IgniteCache<ClientKey, Client> cl = orig.cache("cl"); + + if (regionIds == null) { + regionIds = new int[PARTS_PER_REGION.length]; + + for (int i = 0; i < regionIds.length; i++) + regionIds[i] = i + 1; + } + + for (int regionId : regionIds) { + List<Integer> range = REGION_TO_PART_MAP.get(regionId); + + SqlFieldsQuery qry = new SqlFieldsQuery(JOIN_QRY); + + int[] pSet = createRange(range.get(0), 1 + rnd.nextInt(range.get(1) - 1)); + + qry.setPartitions(pSet); + + try { + List<List<?>> rows = cl.query(qry).getAll(); + + for (List<?> row : rows) { + ClientKey key = (ClientKey)row.get(0); + + int p = orig.affinity("cl").partition(key); + + assertTrue(Arrays.binarySearch(pSet, p) >= 0); + } + + // Query must produce only results from single region. + for (List<?> row : rows) + assertEquals("Region id", regionId, ((Integer)row.get(2)).intValue()); + + if (regionId == UNMAPPED_REGION) + fail(); + } + catch (CacheException ignored) { + if (X.hasCause(ignored, InterruptedException.class, IgniteInterruptedCheckedException.class)) + return; // Allow interruptions. + + if (regionId != UNMAPPED_REGION) + fail(); + } + } + } + + /** + * @param regionId Region id. + * @param clients Clients. + */ + protected void validateClients(int regionId, List<Cache.Entry<ClientKey, Client>> clients) { + for (Cache.Entry<ClientKey, Client> entry : clients) { + List<Integer> range = REGION_TO_PART_MAP.get(regionId); + + int start = range.get(0) * CLIENTS_PER_PARTITION; + int end = start + range.get(1) * CLIENTS_PER_PARTITION; + + int clientId = entry.getKey().clientId; + + assertTrue("Client id in range", start < clientId && start <= end); + } + } + + /** */ + protected static class ClientKey extends RegionKey { + /** Client id. */ + @QuerySqlField(index = true) + protected int clientId; + + /** + * @param clientId Client id. + * @param regionId Region id. + */ + public ClientKey(int clientId, int regionId) { + this.clientId = clientId; + this.regionId = regionId; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + ClientKey clientKey = (ClientKey)o; + + return clientId == clientKey.clientId; + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return clientId; + } + } + + /** */ + protected static class DepositKey extends RegionKey { + @QuerySqlField(index = true) + protected int depositId; + + @QuerySqlField(index = true) + protected int clientId; + + /** Client id. */ + @AffinityKeyMapped + protected ClientKey clientKey; + + /** + * @param depositId Client id. + * @param clientKey Client key. + */ + public DepositKey(int depositId, ClientKey clientKey) { + this.depositId = depositId; + this.clientId = clientKey.clientId; + this.regionId = clientKey.regionId; + this.clientKey = clientKey; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + DepositKey that = (DepositKey)o; + + return depositId == that.depositId; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return depositId; + } + } + + /** */ + protected static class RegionKey implements Serializable { + /** Region id. */ + @QuerySqlField(index = true) + protected int regionId; + } + + /** */ + protected static class Client { + @QuerySqlField + protected String firstName; + + @QuerySqlField + protected String lastName; + + @QuerySqlField(index = true) + protected int passport; + } + + /** */ + protected static class Deposit { + @QuerySqlField + protected long amount; + } + + /** */ + protected static class Region { + @QuerySqlField + protected String name; + + @QuerySqlField + protected int code; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryConfigurationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryConfigurationSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryConfigurationSelfTest.java new file mode 100644 index 0000000..0253fe8 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryConfigurationSelfTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import java.util.Arrays; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Tests cache query configuration. + */ +public class IgniteCacheDistributedPartitionQueryConfigurationSelfTest extends GridCommonAbstractTest { + /** Tests partition validation. */ + public void testPartitions() { + final SqlFieldsQuery qry = new SqlFieldsQuery("select 1"); + + // Empty set is not allowed. + failIfNotThrown(new Runnable() { + @Override public void run() { + qry.setPartitions(); + } + }); + + // Duplicates are not allowed. + failIfNotThrown(new Runnable() { + @Override public void run() { + qry.setPartitions(0, 1, 2, 1); + } + }); + + // Values out of range are not allowed. + failIfNotThrown(new Runnable() { + @Override public void run() { + qry.setPartitions(-1, 0, 1); + } + }); + + // Duplicates with unordered input are not allowed. + failIfNotThrown(new Runnable() { + @Override public void run() { + qry.setPartitions(3, 2, 2); + } + }); + + // Values out of range are not allowed. + failIfNotThrown(new Runnable() { + @Override public void run() { + qry.setPartitions(-1, 0, 1); + } + }); + + // Expecting ordered set. + int[] tmp = new int[] {6, 2 ,3}; + qry.setPartitions(tmp); + + assertTrue(Arrays.equals(new int[]{2, 3, 6}, tmp)); + + // If already ordered expecting same instance. + qry.setPartitions((tmp = new int[] {0, 1, 2})); + + assertTrue(tmp == qry.getPartitions()); + } + + /** + * @param r Runnable. + */ + private void failIfNotThrown(Runnable r) { + try { + r.run(); + + fail(); + } + catch (Exception ignored) { + // No-op. + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.java new file mode 100644 index 0000000..68f9842 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerArray; + +import org.apache.ignite.Ignite; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; + +/** + * Tests distributed queries over set of partitions on unstable topology. + */ +public class IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest + extends IgniteCacheDistributedPartitionQueryAbstractSelfTest { + /** + * Tests join query within region on unstable topology. + */ + public void testJoinQueryUnstableTopology() throws Exception { + final AtomicBoolean stop = new AtomicBoolean(); + + final AtomicIntegerArray states = new AtomicIntegerArray(GRIDS_COUNT); + + final Ignite client = grid("client"); + + final AtomicInteger cnt = new AtomicInteger(); + + IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() { + @Override + public void run() { + while (!stop.get()) { + doTestJoinQuery(client, rnd.nextInt(PARTS_PER_REGION.length) + 1); + + int cur = cnt.incrementAndGet(); + + if (cur % 100 == 0) + log().info("Queries count: " + cur); + } + } + }, QUERY_THREADS_CNT); + + final AtomicIntegerArray restartStats = new AtomicIntegerArray(GRIDS_COUNT); + + IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Void>() { + @Override + public Void call() throws Exception { + while (!stop.get()) { + int grid = rnd.nextInt(GRIDS_COUNT); + + String name = getTestIgniteInstanceName(grid); + + Integer regionId = regionForGrid(name); + + // Restart nodes only from region with enough number of nodes. + if (regionId != 3 && regionId != 4) + continue; + + if (states.compareAndSet(grid, 0, 1)) { + restartStats.incrementAndGet(grid); + + try { + stopGrid(grid); + + Thread.sleep(rnd.nextInt(NODE_RESTART_TIME)); + + startGrid(grid); + + Thread.sleep(rnd.nextInt(NODE_RESTART_TIME)); + } finally { + states.set(grid, 0); + } + } + } + + return null; + } + }, RESTART_THREADS_CNT); + + try { + fut2.get(60, TimeUnit.SECONDS); + } catch (IgniteFutureTimeoutCheckedException ignored) { + stop.set(true); + } + + try { + fut.get(); + } finally { + log().info("Queries count: " + cnt.get()); + + for (int i = 0; i < GRIDS_COUNT; i++) + log().info("Grid [name = " + getTestIgniteInstanceName(i) + ", idx=" + i + " ] restarts count: " + + restartStats.get(i)); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQuerySelfTest.java new file mode 100644 index 0000000..00c3848 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQuerySelfTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import java.util.Arrays; +import java.util.List; +import javax.cache.Cache; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.SqlQuery; + +/** + * Tests distributed queries over set of partitions on stable topology. + */ +public class IgniteCacheDistributedPartitionQuerySelfTest extends IgniteCacheDistributedPartitionQueryAbstractSelfTest { + /** Tests query within region. */ + public void testRegionQuery() { + doTestRegionQuery(grid(0)); + } + + /** Tests query within region (client). */ + public void testRegionQueryClient() throws Exception { + doTestRegionQuery(grid("client")); + } + + /** Test query within partitions. */ + public void testPartitionsQuery() { + doTestPartitionsQuery(grid(0)); + } + + /** Test query within partitions (client). */ + public void testPartitionsQueryClient() throws Exception { + doTestPartitionsQuery(grid("client")); + } + + /** Tests join query within region. */ + public void testJoinQuery() { + doTestJoinQuery(grid(0)); + } + + /** Tests join query within region. */ + public void testJoinQueryClient() throws Exception { + doTestJoinQuery(grid("client")); + } + + /** Tests local query over partitions. */ + public void testLocalQuery() { + Affinity<Object> affinity = grid(0).affinity("cl"); + + int[] parts = affinity.primaryPartitions(grid(0).localNode()); + + Arrays.sort(parts); + + IgniteCache<ClientKey, Client> cl = grid(0).cache("cl"); + + SqlQuery<ClientKey, Client> qry1 = new SqlQuery<>(Client.class, "1=1"); + qry1.setLocal(true); + qry1.setPartitions(parts[0]); + + List<Cache.Entry<ClientKey, Client>> clients = cl.query(qry1).getAll(); + + for (Cache.Entry<ClientKey, Client> client : clients) + assertEquals("Incorrect partition", parts[0], affinity.partition(client.getKey())); + + SqlFieldsQuery qry2 = new SqlFieldsQuery("select cl._KEY, cl._VAL from \"cl\".Client cl"); + qry2.setLocal(true); + qry2.setPartitions(parts[0]); + + List<List<?>> rows = cl.query(qry2).getAll(); + + for (List<?> row : rows) + assertEquals("Incorrect partition", parts[0], affinity.partition(row.get(0))); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java index 6fc9c39..001f40b 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java @@ -33,6 +33,8 @@ import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.configuration.MemoryPolicyConfiguration; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -89,6 +91,12 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration c = super.getConfiguration(igniteInstanceName); + MemoryConfiguration memCfg = new MemoryConfiguration(); + memCfg.setDefaultMemoryPolicyName("default"); + memCfg.setMemoryPolicies(new MemoryPolicyConfiguration().setName("default").setSize(50 * 1024 * 1024)); + + c.setMemoryConfiguration(memCfg); + TcpDiscoverySpi disco = new TcpDiscoverySpi(); disco.setIpFinder(ipFinder); http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 862d1a2..032e544 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -68,6 +68,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheA import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheAtomicNearEnabledFieldsQuerySelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheAtomicNearEnabledQuerySelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheAtomicQuerySelfTest; +import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheDistributedPartitionQuerySelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheDistributedQueryCancelSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedFieldsQueryP2PEnabledSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedFieldsQuerySelfTest; @@ -75,6 +77,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheP import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedQuerySelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedSnapshotEnabledQuerySelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryAbstractDistributedJoinSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheDistributedPartitionQueryConfigurationSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNoRebalanceSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryP2PEnabledSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryROSelfTest; @@ -277,6 +280,9 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(IgniteQueryDedicatedPoolTest.class); suite.addTestSuite(IgniteSqlEntryCacheModeAgnosticTest.class); suite.addTestSuite(QueryEntityCaseMismatchTest.class); + suite.addTestSuite(IgniteCacheDistributedPartitionQuerySelfTest.class); + suite.addTestSuite(IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.class); + suite.addTestSuite(IgniteCacheDistributedPartitionQueryConfigurationSelfTest.class); return suite; }
