This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 11fca4aaaa1 IGNITE-23593 Use CacheQuery instead of
GridCacheQueryAdapter (#11639)
11fca4aaaa1 is described below
commit 11fca4aaaa11aecf8827d9313f17cfd1db6b170d
Author: Nikolay <[email protected]>
AuthorDate: Sat Nov 2 19:25:26 2024 +0300
IGNITE-23593 Use CacheQuery instead of GridCacheQueryAdapter (#11639)
---
.../processors/cache/IgniteCacheProxyImpl.java | 3 +-
.../processors/cache/query/CacheQuery.java | 845 ++++++++++++++++++-
.../query/GridCacheDistributedQueryManager.java | 6 +-
.../cache/query/GridCacheQueryAdapter.java | 928 ---------------------
.../processors/cache/query/GridCacheQueryBean.java | 8 +-
.../processors/cache/query/GridCacheQueryInfo.java | 6 +-
.../cache/query/GridCacheQueryManager.java | 34 +-
.../cache/query/GridCacheQueryRequest.java | 4 +-
.../datastructures/GridCacheSetImpl.java | 5 +-
.../main/resources/META-INF/classnames.properties | 4 +-
10 files changed, 868 insertions(+), 975 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index 2d2533a29a2..dba67c35100 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -81,7 +81,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.query.CacheQuery;
import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
-import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
@@ -562,7 +561,7 @@ public class IgniteCacheProxyImpl<K, V> extends
AsyncSupportAdapter<IgniteCache<
final GridCloseableIterator iter =
ctx.kernalContext().query().executeQuery(GridCacheQueryType.INDEX,
cacheName, ctx, new
IgniteOutClosureX<GridCloseableIterator>() {
@Override public GridCloseableIterator applyx() throws
IgniteCheckedException {
- return
ctx.queries().indexQueryLocal((GridCacheQueryAdapter)qry);
+ return ctx.queries().indexQueryLocal(qry);
}
}, true);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
index c610c143915..2d9474993d0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
@@ -17,17 +17,63 @@
package org.apache.ignite.internal.processors.cache.query;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.AffinityKey;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.cache.query.annotations.QueryTextField;
import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
+import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteReducer;
+import org.apache.ignite.plugin.security.SecurityPermission;
import org.jetbrains.annotations.Nullable;
+import static
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.INDEX;
+import static
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SCAN;
+import static
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SET;
+import static
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SPI;
+import static
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
+
/**
* Main API for configuring and executing cache queries.
* <p>
@@ -185,7 +231,269 @@ import org.jetbrains.annotations.Nullable;
* .get();
* </pre>
*/
-public interface CacheQuery<T> {
+public class CacheQuery<T> {
+ /** */
+ private final GridCacheContext<?, ?> cctx;
+
+ /** */
+ private final GridCacheQueryType type;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** Class name in case of binary query. */
+ private final String clsName;
+
+ /** */
+ @GridToStringInclude(sensitive = true)
+ private final String clause;
+
+ /** Description of IndexQuery. */
+ private final IndexQueryDesc idxQryDesc;
+
+ /** */
+ private final IgniteBiPredicate<Object, Object> filter;
+
+ /** Limits returned records quantity. */
+ private int limit;
+
+ /** Transformer. */
+ private IgniteClosure<?, ?> transform;
+
+ /** Partition. */
+ private Integer part;
+
+ /** */
+ private final boolean incMeta;
+
+ /** */
+ private volatile int pageSize = Query.DFLT_PAGE_SIZE;
+
+ /** */
+ private volatile long timeout;
+
+ /** */
+ private volatile boolean incBackups;
+
+ /** Local query. */
+ private boolean forceLocal;
+
+ /** */
+ private volatile boolean dedup;
+
+ /** */
+ private volatile ClusterGroup prj;
+
+ /** */
+ private boolean keepBinary;
+
+ /** */
+ private int taskHash;
+
+ /** */
+ private Boolean dataPageScanEnabled;
+
+ /**
+ * Cache query adapter for SCAN query.
+ *
+ * @param cctx Context.
+ * @param type Query type.
+ * @param filter Scan filter.
+ * @param part Partition.
+ * @param keepBinary Keep binary flag.
+ * @param forceLocal Flag to force local query.
+ * @param dataPageScanEnabled Flag to enable data page scan.
+ */
+ public CacheQuery(
+ GridCacheContext<?, ?> cctx,
+ GridCacheQueryType type,
+ @Nullable IgniteBiPredicate<Object, Object> filter,
+ @Nullable IgniteClosure<Map.Entry, Object> transform,
+ @Nullable Integer part,
+ boolean keepBinary,
+ boolean forceLocal,
+ Boolean dataPageScanEnabled
+ ) {
+ this(cctx, type, null, null, filter, part, false, keepBinary,
dataPageScanEnabled, null);
+
+ this.transform = transform;
+ this.forceLocal = forceLocal;
+ }
+
+ /**
+ * Cache query adapter for SET, SPI, TEXT queries.
+ *
+ * @param cctx Context.
+ * @param type Query type.
+ * @param clsName Class name.
+ * @param clause Clause.
+ * @param filter Scan filter.
+ * @param part Partition.
+ * @param incMeta Include metadata flag.
+ * @param keepBinary Keep binary flag.
+ * @param dataPageScanEnabled Flag to enable data page scan.
+ */
+ public CacheQuery(
+ GridCacheContext<?, ?> cctx,
+ GridCacheQueryType type,
+ @Nullable String clsName,
+ @Nullable String clause,
+ @Nullable IgniteBiPredicate<Object, Object> filter,
+ @Nullable Integer part,
+ boolean incMeta,
+ boolean keepBinary,
+ Boolean dataPageScanEnabled,
+ IndexQueryDesc idxQryDesc
+ ) {
+ assert cctx != null;
+ assert type != null;
+ assert part == null || part >= 0;
+
+ this.cctx = cctx;
+ this.type = type;
+ this.clsName = clsName;
+ this.clause = clause;
+ this.filter = filter;
+ this.part = part;
+ this.incMeta = incMeta;
+ this.keepBinary = keepBinary;
+ this.dataPageScanEnabled = dataPageScanEnabled;
+ this.idxQryDesc = idxQryDesc;
+
+ log = cctx.logger(getClass());
+ }
+
+ /**
+ * Cache query adapter for local query processing.
+ *
+ * @param cctx Context.
+ * @param type Query type.
+ * @param log Logger.
+ * @param pageSize Page size.
+ * @param timeout Timeout.
+ * @param incBackups Include backups flag.
+ * @param dedup Enable dedup flag.
+ * @param prj Grid projection.
+ * @param filter Key-value filter.
+ * @param part Partition.
+ * @param clsName Class name.
+ * @param clause Clause.
+ * @param limit Response limit. Set to 0 for no limits.
+ * @param incMeta Include metadata flag.
+ * @param keepBinary Keep binary flag.
+ * @param taskHash Task hash.
+ * @param dataPageScanEnabled Flag to enable data page scan.
+ */
+ public CacheQuery(
+ GridCacheContext<?, ?> cctx,
+ GridCacheQueryType type,
+ IgniteLogger log,
+ int pageSize,
+ long timeout,
+ boolean incBackups,
+ boolean dedup,
+ ClusterGroup prj,
+ IgniteBiPredicate<Object, Object> filter,
+ @Nullable Integer part,
+ @Nullable String clsName,
+ String clause,
+ IndexQueryDesc idxQryDesc,
+ int limit,
+ boolean incMeta,
+ boolean keepBinary,
+ int taskHash,
+ Boolean dataPageScanEnabled
+ ) {
+ this.cctx = cctx;
+ this.type = type;
+ this.log = log;
+ this.pageSize = pageSize;
+ this.timeout = timeout;
+ this.incBackups = incBackups;
+ this.dedup = dedup;
+ this.prj = prj;
+ this.filter = filter;
+ this.part = part;
+ this.clsName = clsName;
+ this.clause = clause;
+ this.idxQryDesc = idxQryDesc;
+ this.limit = limit;
+ this.incMeta = incMeta;
+ this.keepBinary = keepBinary;
+ this.taskHash = taskHash;
+ this.dataPageScanEnabled = dataPageScanEnabled;
+ }
+
+ /**
+ * Cache query adapter for INDEX query.
+ *
+ * @param cctx Context.
+ * @param type Query type.
+ * @param idxQryDesc Index query descriptor.
+ * @param part Partition number to iterate over.
+ * @param clsName Class name.
+ * @param filter Index query remote filter.
+ */
+ public CacheQuery(
+ GridCacheContext<?, ?> cctx,
+ GridCacheQueryType type,
+ IndexQueryDesc idxQryDesc,
+ @Nullable Integer part,
+ @Nullable String clsName,
+ @Nullable IgniteBiPredicate<Object, Object> filter
+ ) {
+ this(cctx, type, clsName, null, filter, part, false, false, null,
idxQryDesc);
+ }
+
+ /** @return Flag to enable data page scan. */
+ public Boolean isDataPageScanEnabled() {
+ return dataPageScanEnabled;
+ }
+
+ /** @return Type. */
+ public GridCacheQueryType type() {
+ return type;
+ }
+
+ /** @return Class name. */
+ @Nullable public String queryClassName() {
+ return clsName;
+ }
+
+ /** @return Clause. */
+ @Nullable public String clause() {
+ return clause;
+ }
+
+ /** @return Include metadata flag. */
+ public boolean includeMetadata() {
+ return incMeta;
+ }
+
+ /** @return {@code True} if binary should not be deserialized. */
+ public boolean keepBinary() {
+ return keepBinary;
+ }
+
+ /**
+ * Forces query to keep binary object representation even if query was
created on plain projection.
+ *
+ * @param keepBinary Keep binary flag.
+ */
+ public void keepBinary(boolean keepBinary) {
+ this.keepBinary = keepBinary;
+ }
+
+ /** @return {@code True} if the query is forced local. */
+ public boolean forceLocal() {
+ return forceLocal;
+ }
+
+ /** @return Task hash. */
+ public int taskHash() {
+ return taskHash;
+ }
+
/**
* Sets result page size. If not provided, {@link Query#DFLT_PAGE_SIZE}
will be used.
* Results are returned from queried nodes one page at a tme.
@@ -193,7 +501,18 @@ public interface CacheQuery<T> {
* @param pageSize Page size.
* @return {@code this} query instance for chaining.
*/
- public CacheQuery<T> pageSize(int pageSize);
+ public CacheQuery<T> pageSize(int pageSize) {
+ A.ensure(pageSize > 0, "pageSize > 0");
+
+ this.pageSize = pageSize;
+
+ return this;
+ }
+
+ /** @return Page size. */
+ public int pageSize() {
+ return pageSize;
+ }
/**
* Sets query timeout. {@code 0} means there is no timeout (this
@@ -202,7 +521,18 @@ public interface CacheQuery<T> {
* @param timeout Query timeout.
* @return {@code this} query instance for chaining.
*/
- public CacheQuery<T> timeout(long timeout);
+ public CacheQuery<T> timeout(long timeout) {
+ A.ensure(timeout >= 0, "timeout >= 0");
+
+ this.timeout = timeout;
+
+ return this;
+ }
+
+ /** @return Response limit. Returns 0 for no limits. */
+ public int limit() {
+ return limit;
+ }
/**
* Sets limit of returned records. {@code 0} means there is no limit
@@ -210,7 +540,16 @@ public interface CacheQuery<T> {
* @param limit Records limit.
* @return {@code this} query instance for chaining.
*/
- public CacheQuery<T> limit(int limit);
+ public CacheQuery<T> limit(int limit) {
+ this.limit = limit;
+
+ return this;
+ }
+
+ /** @return Timeout. */
+ public long timeout() {
+ return timeout;
+ }
/**
* Sets whether or not to include backup entries into query result. This
flag
@@ -219,7 +558,16 @@ public interface CacheQuery<T> {
* @param incBackups Query {@code includeBackups} flag.
* @return {@code this} query instance for chaining.
*/
- public CacheQuery<T> includeBackups(boolean incBackups);
+ public CacheQuery<T> includeBackups(boolean incBackups) {
+ this.incBackups = incBackups;
+
+ return this;
+ }
+
+ /** @return Include backups. */
+ public boolean includeBackups() {
+ return incBackups;
+ }
/**
* Sets whether or not to deduplicate query result set. If this flag is
{@code true}
@@ -229,7 +577,16 @@ public interface CacheQuery<T> {
* @param dedup Query {@code enableDedup} flag.
* @return {@code this} query instance for chaining.
*/
- public CacheQuery<T> enableDedup(boolean dedup);
+ public CacheQuery<T> enableDedup(boolean dedup) {
+ this.dedup = dedup;
+
+ return this;
+ }
+
+ /** @return Enable dedup flag. */
+ public boolean enableDedup() {
+ return dedup;
+ }
/**
* Sets optional grid projection to execute this query on.
@@ -237,7 +594,43 @@ public interface CacheQuery<T> {
* @param prj Projection.
* @return {@code this} query instance for chaining.
*/
- public CacheQuery<T> projection(ClusterGroup prj);
+ public CacheQuery<T> projection(ClusterGroup prj) {
+ this.prj = prj;
+
+ return this;
+ }
+
+ /** @return Grid projection. */
+ public ClusterGroup projection() {
+ return prj;
+ }
+
+ /** @return Key-value filter. */
+ @Nullable public <K, V> IgniteBiPredicate<K, V> scanFilter() {
+ return (IgniteBiPredicate<K, V>)filter;
+ }
+
+ /** @return Transformer. */
+ @Nullable public <K, V> IgniteClosure<Map.Entry<K, V>, Object> transform()
{
+ return (IgniteClosure<Map.Entry<K, V>, Object>)transform;
+ }
+
+ /** @return Partition. */
+ @Nullable public Integer partition() {
+ return part;
+ }
+
+ /** @return Index query description. */
+ @Nullable public IndexQueryDesc idxQryDesc() {
+ return idxQryDesc;
+ }
+
+ /** @throws IgniteCheckedException If query is invalid. */
+ public void validate() throws IgniteCheckedException {
+ if ((type != SCAN && type != SET && type != SPI && type != INDEX)
+ && !QueryUtils.isEnabled(cctx.config()))
+ throw new IgniteCheckedException("Indexing is disabled for cache:
" + cctx.cache().name());
+ }
/**
* Executes the query and returns the query future. Caller may decide to
iterate
@@ -254,7 +647,9 @@ public interface CacheQuery<T> {
* @param args Optional arguments.
* @return Future for the query result.
*/
- public CacheQueryFuture<T> execute(@Nullable Object... args);
+ public CacheQueryFuture<T> execute(@Nullable Object... args) {
+ return execute0(null, args);
+ }
/**
* Executes the query the same way as {@link #execute(Object...)} method
but reduces result remotely.
@@ -263,10 +658,438 @@ public interface CacheQuery<T> {
* @param args Optional arguments.
* @return Future for the query result.
*/
- public <R> CacheQueryFuture<R> execute(IgniteReducer<T, R> rmtReducer,
@Nullable Object... args);
+ public <R> CacheQueryFuture<R> execute(IgniteReducer<T, R> rmtReducer,
@Nullable Object... args) {
+ return execute0(rmtReducer, args);
+ }
+
+ /**
+ * @param rmtReducer Optional reducer.
+ * @param args Arguments.
+ * @return Future.
+ */
+ @SuppressWarnings({"IfMayBeConditional"})
+ private <R> CacheQueryFuture<R> execute0(@Nullable IgniteReducer<T, R>
rmtReducer, @Nullable Object... args) {
+ assert type != SCAN : this;
+
+ Collection<ClusterNode> nodes;
+
+ try {
+ nodes = nodes();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), e);
+ }
+
+ cctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+ if (nodes.isEmpty())
+ return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), new
ClusterGroupEmptyCheckedException());
+
+ if (log.isDebugEnabled())
+ log.debug("Executing query [query=" + this + ", nodes=" + nodes +
']');
+
+ if (cctx.deploymentEnabled()) {
+ try {
+ cctx.deploy().registerClasses(filter, rmtReducer);
+ cctx.deploy().registerClasses(args);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridCacheQueryErrorFuture<>(cctx.kernalContext(),
e);
+ }
+ }
+
+ taskHash = cctx.kernalContext().job().currentTaskNameHash();
+
+ final GridCacheQueryBean bean = new GridCacheQueryBean(this,
(IgniteReducer<Object, Object>)rmtReducer,
+ null, args);
+
+ final GridCacheQueryManager qryMgr = cctx.queries();
+
+ boolean loc = nodes.size() == 1 &&
F.first(nodes).id().equals(cctx.localNodeId());
+
+ if (type == SQL_FIELDS || type == SPI)
+ return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) :
+ qryMgr.queryFieldsDistributed(bean, nodes));
+ else
+ return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) :
qryMgr.queryDistributed(bean, nodes));
+ }
+
+ /** @return Scan query iterator. */
+ public GridCloseableIterator executeScanQuery() throws
IgniteCheckedException {
+ assert type == SCAN : "Wrong processing of query: " + type;
+
+ GridDhtCacheAdapter<?, ?> cacheAdapter = cctx.isNear() ?
cctx.near().dht() : cctx.dht();
+
+ Set<Integer> lostParts = cacheAdapter.topology().lostPartitions();
+
+ if (!lostParts.isEmpty()) {
+ if (part == null || lostParts.contains(part)) {
+ throw new CacheException(new
CacheInvalidStateException("Failed to execute query because cache partition " +
+ "has been lostParts [cacheName=" + cctx.name() +
+ ", part=" + (part == null ? lostParts.iterator().next() :
part) + ']'));
+ }
+ }
+
+ // Affinity nodes snapshot.
+ Collection<ClusterNode> nodes = new ArrayList<>(nodes());
+
+ cctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+ if (nodes.isEmpty()) {
+ if (part != null) {
+ if (forceLocal) {
+ throw new IgniteCheckedException("No queryable nodes for
partition " + part
+ + " [forced local query=" + this + "]");
+ }
+ }
+
+ return new GridEmptyCloseableIterator();
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Executing query [query=" + this + ", nodes=" + nodes +
']');
+
+ if (cctx.deploymentEnabled())
+ cctx.deploy().registerClasses(filter);
+
+ taskHash = cctx.kernalContext().job().currentTaskNameHash();
+
+ final GridCacheQueryManager qryMgr = cctx.queries();
+
+ boolean loc = nodes.size() == 1 &&
F.first(nodes).id().equals(cctx.localNodeId());
+
+ if (loc)
+ return qryMgr.scanQueryLocal(this, true);
+ else if (part != null)
+ return new ScanQueryFallbackClosableIterator(part, this, qryMgr,
cctx);
+ else
+ return qryMgr.scanQueryDistributed(this, nodes);
+ }
+
+ /** @return Nodes to execute on. */
+ private Collection<ClusterNode> nodes() throws IgniteCheckedException {
+ CacheMode cacheMode = cctx.config().getCacheMode();
+
+ Integer part = partition();
+
+ switch (cacheMode) {
+ case REPLICATED:
+ if (prj != null || part != null)
+ return nodes(cctx, prj, part);
+
+ GridDhtPartitionTopology top = cctx.topology();
+
+ if (cctx.affinityNode() &&
!top.localPartitionMap().hasMovingPartitions())
+ return Collections.singletonList(cctx.localNode());
+
+ top.readLock();
+
+ try {
+
+ Collection<ClusterNode> affNodes = nodes(cctx, null, null);
+
+ List<ClusterNode> nodes = new ArrayList<>(affNodes);
+
+ Collections.shuffle(nodes);
+
+ for (ClusterNode node : nodes) {
+ if (!top.partitions(node.id()).hasMovingPartitions())
+ return Collections.singletonList(node);
+ }
+
+ return affNodes;
+ }
+ finally {
+ top.readUnlock();
+ }
+
+ case PARTITIONED:
+ return nodes(cctx, prj, part);
+
+ default:
+ throw new IllegalStateException("Unknown cache distribution
mode: " + cacheMode);
+ }
+ }
/**
- * @return Scan query iterator.
+ * @param cctx Cache context.
+ * @param prj Projection (optional).
+ * @return Collection of data nodes in provided projection (if any).
+ * @throws IgniteCheckedException If partition number is invalid.
*/
- public GridCloseableIterator executeScanQuery() throws
IgniteCheckedException;
+ private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?>
cctx,
+ @Nullable final ClusterGroup prj, @Nullable final Integer part) throws
IgniteCheckedException {
+ assert cctx != null;
+
+ final AffinityTopologyVersion topVer =
cctx.affinity().affinityTopologyVersion();
+
+ Collection<ClusterNode> affNodes = CU.affinityNodes(cctx, topVer);
+
+ if (prj == null && part == null)
+ return affNodes;
+
+ if (part != null && part >= cctx.affinity().partitions())
+ throw new IgniteCheckedException("Invalid partition number: " +
part);
+
+ final Set<ClusterNode> owners =
+ part == null ? Collections.<ClusterNode>emptySet() : new
HashSet<>(cctx.topology().owners(part, topVer));
+
+ return F.view(affNodes, new P1<ClusterNode>() {
+ @Override public boolean apply(ClusterNode n) {
+ return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
+ (prj == null || prj.node(n.id()) != null) &&
+ (part == null || owners.contains(n));
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheQuery.class, this);
+ }
+
+ /** Wrapper for queries with fallback. */
+ private static class ScanQueryFallbackClosableIterator extends
GridCloseableIteratorAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Query future. */
+ private volatile T2<GridCloseableIterator<Object>,
GridCacheQueryFutureAdapter> tuple;
+
+ /** Backups. */
+ private volatile Queue<ClusterNode> nodes;
+
+ /** Topology version of the last detected {@link
GridDhtUnreservedPartitionException}. */
+ private volatile AffinityTopologyVersion unreservedTopVer;
+
+ /** Number of times to retry the query on the nodes failed with {@link
GridDhtUnreservedPartitionException}. */
+ private volatile int unreservedNodesRetryCnt = 5;
+
+ /** Bean. */
+ private final CacheQuery qry;
+
+ /** Query manager. */
+ private final GridCacheQueryManager qryMgr;
+
+ /** Cache context. */
+ private final GridCacheContext cctx;
+
+ /** Partition. */
+ private final int part;
+
+ /** Flag indicating that a first item has been returned to a user. */
+ private boolean firstItemReturned;
+
+ /** */
+ private Object cur;
+
+ /**
+ * @param part Partition.
+ * @param qry Query.
+ * @param qryMgr Query manager.
+ * @param cctx Cache context.
+ */
+ private ScanQueryFallbackClosableIterator(int part, CacheQuery qry,
+ GridCacheQueryManager qryMgr, GridCacheContext cctx) {
+ this.qry = qry;
+ this.qryMgr = qryMgr;
+ this.cctx = cctx;
+ this.part = part;
+
+ nodes = fallbacks(cctx.shared().exchange().readyAffinityVersion());
+
+ if (F.isEmpty(nodes))
+ throw new ClusterTopologyException("Failed to execute the
query " +
+ "(all affinity nodes left the grid) [cache=" + cctx.name()
+
+ ", qry=" + qry +
+ ", curTopVer=" +
qryMgr.queryTopologyVersion().topologyVersion() + ']');
+
+ init();
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @return Nodes for query execution.
+ */
+ private Queue<ClusterNode> fallbacks(AffinityTopologyVersion topVer) {
+ Deque<ClusterNode> fallbacks = new LinkedList<>();
+ Collection<ClusterNode> owners = new HashSet<>();
+
+ for (ClusterNode node : cctx.topology().owners(part, topVer)) {
+ if (node.isLocal())
+ fallbacks.addFirst(node);
+ else
+ fallbacks.add(node);
+
+ owners.add(node);
+ }
+
+ for (ClusterNode node : cctx.topology().moving(part)) {
+ if (!owners.contains(node))
+ fallbacks.add(node);
+ }
+
+ return fallbacks;
+ }
+
+ /** */
+ @SuppressWarnings("unchecked")
+ private void init() {
+ final ClusterNode node = nodes.poll();
+
+ if (node.isLocal()) {
+ try {
+ GridCloseableIterator it = qryMgr.scanQueryLocal(qry,
true);
+
+ tuple = new T2(it, null);
+ }
+ catch (IgniteClientDisconnectedCheckedException e) {
+ throw CU.convertToCacheException(e);
+ }
+ catch (IgniteCheckedException e) {
+ retryIfPossible(e);
+ }
+ }
+ else {
+ final GridCacheQueryBean bean = new GridCacheQueryBean(qry,
null, qry.transform, null);
+
+ GridCacheQueryFutureAdapter fut =
+ (GridCacheQueryFutureAdapter)qryMgr.queryDistributed(bean,
Collections.singleton(node));
+
+ tuple = new T2(null, fut);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Object onNext() throws IgniteCheckedException {
+ if (!onHasNext())
+ throw new NoSuchElementException();
+
+ assert cur != null;
+
+ Object e = cur;
+
+ cur = null;
+
+ return e;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean onHasNext() throws IgniteCheckedException {
+ while (true) {
+ if (cur != null)
+ return true;
+
+ T2<GridCloseableIterator<Object>, GridCacheQueryFutureAdapter>
t = tuple;
+
+ GridCloseableIterator<Object> iter = t.get1();
+
+ if (iter != null) {
+ boolean hasNext = iter.hasNext();
+
+ if (hasNext)
+ cur = iter.next();
+
+ return hasNext;
+ }
+ else {
+ GridCacheQueryFutureAdapter fut = t.get2();
+
+ assert fut != null;
+
+ if (firstItemReturned)
+ return (cur = convert(fut.next())) != null;
+
+ try {
+ fut.awaitFirstItemAvailable();
+
+ firstItemReturned = true;
+
+ return (cur = convert(fut.next())) != null;
+ }
+ catch (IgniteClientDisconnectedCheckedException e) {
+ throw CU.convertToCacheException(e);
+ }
+ catch (IgniteCheckedException e) {
+ retryIfPossible(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * @param obj Entry to convert.
+ * @return Cache entry
+ */
+ private Object convert(Object obj) {
+ if (qry.transform() != null)
+ return obj;
+
+ Map.Entry e = (Map.Entry)obj;
+
+ return e == null ? null : new CacheQueryEntry(e.getKey(),
e.getValue());
+ }
+
+ /** @param e Exception for query run. */
+ private void retryIfPossible(IgniteCheckedException e) {
+ try {
+ IgniteInternalFuture<?> retryFut;
+
+ GridDhtUnreservedPartitionException partErr = X.cause(e,
GridDhtUnreservedPartitionException.class);
+
+ if (partErr != null) {
+ AffinityTopologyVersion waitVer =
partErr.topologyVersion();
+
+ assert waitVer != null;
+
+ retryFut =
cctx.shared().exchange().affinityReadyFuture(waitVer);
+ }
+ else if (e.hasCause(ClusterTopologyCheckedException.class)) {
+ ClusterTopologyCheckedException topEx = X.cause(e,
ClusterTopologyCheckedException.class);
+
+ retryFut = topEx.retryReadyFuture();
+ }
+ else if (e.hasCause(ClusterGroupEmptyCheckedException.class)) {
+ ClusterGroupEmptyCheckedException ex = X.cause(e,
ClusterGroupEmptyCheckedException.class);
+
+ retryFut = ex.retryReadyFuture();
+ }
+ else
+ throw CU.convertToCacheException(e);
+
+ if (F.isEmpty(nodes)) {
+ if (--unreservedNodesRetryCnt > 0) {
+ if (retryFut != null)
+ retryFut.get();
+
+ nodes = fallbacks(unreservedTopVer == null ?
cctx.shared().exchange().readyAffinityVersion() : unreservedTopVer);
+
+ unreservedTopVer = null;
+
+ init();
+ }
+ else
+ throw CU.convertToCacheException(e);
+ }
+ else
+ init();
+ }
+ catch (IgniteCheckedException ex) {
+ throw CU.convertToCacheException(ex);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onClose() throws IgniteCheckedException {
+ super.onClose();
+
+ T2<GridCloseableIterator<Object>, GridCacheQueryFutureAdapter> t =
tuple;
+
+ if (t != null && t.get1() != null)
+ t.get1().close();
+
+ if (t != null && t.get2() != null)
+ t.get2().cancel();
+ }
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 0bbb9ca219c..5e959eac1b4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -244,8 +244,8 @@ public class GridCacheDistributedQueryManager<K, V> extends
GridCacheQueryManage
if (sndNode == null)
return null;
- GridCacheQueryAdapter<?> qry =
- new GridCacheQueryAdapter<>(
+ CacheQuery<?> qry =
+ new CacheQuery<>(
cctx,
req.type(),
log,
@@ -541,7 +541,7 @@ public class GridCacheDistributedQueryManager<K, V> extends
GridCacheQueryManage
/** {@inheritDoc} */
@SuppressWarnings({"unchecked"})
- @Override public GridCloseableIterator scanQueryDistributed(final
GridCacheQueryAdapter qry,
+ @Override public GridCloseableIterator scanQueryDistributed(final
CacheQuery qry,
Collection<ClusterNode> nodes) throws IgniteCheckedException {
assert qry.type() == GridCacheQueryType.SCAN : qry;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
deleted file mode 100644
index 3ffb78b2c01..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ /dev/null
@@ -1,928 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Queue;
-import java.util.Set;
-import javax.cache.CacheException;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.query.Query;
-import org.apache.ignite.cluster.ClusterGroup;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.cluster.ClusterTopologyException;
-import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
-import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
-import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
-import org.apache.ignite.internal.util.lang.GridCloseableIterator;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.lang.IgniteReducer;
-import org.apache.ignite.plugin.security.SecurityPermission;
-import org.jetbrains.annotations.Nullable;
-
-import static
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.INDEX;
-import static
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SCAN;
-import static
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SET;
-import static
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SPI;
-import static
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
-
-/**
- * Query adapter.
- */
-public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
- /** */
- private final GridCacheContext<?, ?> cctx;
-
- /** */
- private final GridCacheQueryType type;
-
- /** */
- private final IgniteLogger log;
-
- /** Class name in case of binary query. */
- private final String clsName;
-
- /** */
- @GridToStringInclude(sensitive = true)
- private final String clause;
-
- /** Description of IndexQuery. */
- private final IndexQueryDesc idxQryDesc;
-
- /** */
- private final IgniteBiPredicate<Object, Object> filter;
-
- /** Limits returned records quantity. */
- private int limit;
-
- /** Transformer. */
- private IgniteClosure<?, ?> transform;
-
- /** Partition. */
- private Integer part;
-
- /** */
- private final boolean incMeta;
-
- /** */
- private volatile int pageSize = Query.DFLT_PAGE_SIZE;
-
- /** */
- private volatile long timeout;
-
- /** */
- private volatile boolean incBackups;
-
- /** Local query. */
- private boolean forceLocal;
-
- /** */
- private volatile boolean dedup;
-
- /** */
- private volatile ClusterGroup prj;
-
- /** */
- private boolean keepBinary;
-
- /** */
- private int taskHash;
-
- /** */
- private Boolean dataPageScanEnabled;
-
- /**
- * Cache query adapter for SCAN query.
- *
- * @param cctx Context.
- * @param type Query type.
- * @param filter Scan filter.
- * @param part Partition.
- * @param keepBinary Keep binary flag.
- * @param forceLocal Flag to force local query.
- * @param dataPageScanEnabled Flag to enable data page scan.
- */
- public GridCacheQueryAdapter(
- GridCacheContext<?, ?> cctx,
- GridCacheQueryType type,
- @Nullable IgniteBiPredicate<Object, Object> filter,
- @Nullable IgniteClosure<Map.Entry, Object> transform,
- @Nullable Integer part,
- boolean keepBinary,
- boolean forceLocal,
- Boolean dataPageScanEnabled
- ) {
- this(cctx, type, null, null, filter, part, false, keepBinary,
dataPageScanEnabled, null);
-
- this.transform = transform;
- this.forceLocal = forceLocal;
- }
-
- /**
- * Cache query adapter for SET, SPI, TEXT queries.
- *
- * @param cctx Context.
- * @param type Query type.
- * @param clsName Class name.
- * @param clause Clause.
- * @param filter Scan filter.
- * @param part Partition.
- * @param incMeta Include metadata flag.
- * @param keepBinary Keep binary flag.
- * @param dataPageScanEnabled Flag to enable data page scan.
- */
- public GridCacheQueryAdapter(
- GridCacheContext<?, ?> cctx,
- GridCacheQueryType type,
- @Nullable String clsName,
- @Nullable String clause,
- @Nullable IgniteBiPredicate<Object, Object> filter,
- @Nullable Integer part,
- boolean incMeta,
- boolean keepBinary,
- Boolean dataPageScanEnabled,
- IndexQueryDesc idxQryDesc
- ) {
- assert cctx != null;
- assert type != null;
- assert part == null || part >= 0;
-
- this.cctx = cctx;
- this.type = type;
- this.clsName = clsName;
- this.clause = clause;
- this.filter = filter;
- this.part = part;
- this.incMeta = incMeta;
- this.keepBinary = keepBinary;
- this.dataPageScanEnabled = dataPageScanEnabled;
- this.idxQryDesc = idxQryDesc;
-
- log = cctx.logger(getClass());
- }
-
- /**
- * Cache query adapter for local query processing.
- *
- * @param cctx Context.
- * @param type Query type.
- * @param log Logger.
- * @param pageSize Page size.
- * @param timeout Timeout.
- * @param incBackups Include backups flag.
- * @param dedup Enable dedup flag.
- * @param prj Grid projection.
- * @param filter Key-value filter.
- * @param part Partition.
- * @param clsName Class name.
- * @param clause Clause.
- * @param limit Response limit. Set to 0 for no limits.
- * @param incMeta Include metadata flag.
- * @param keepBinary Keep binary flag.
- * @param taskHash Task hash.
- * @param dataPageScanEnabled Flag to enable data page scan.
- */
- public GridCacheQueryAdapter(
- GridCacheContext<?, ?> cctx,
- GridCacheQueryType type,
- IgniteLogger log,
- int pageSize,
- long timeout,
- boolean incBackups,
- boolean dedup,
- ClusterGroup prj,
- IgniteBiPredicate<Object, Object> filter,
- @Nullable Integer part,
- @Nullable String clsName,
- String clause,
- IndexQueryDesc idxQryDesc,
- int limit,
- boolean incMeta,
- boolean keepBinary,
- int taskHash,
- Boolean dataPageScanEnabled
- ) {
- this.cctx = cctx;
- this.type = type;
- this.log = log;
- this.pageSize = pageSize;
- this.timeout = timeout;
- this.incBackups = incBackups;
- this.dedup = dedup;
- this.prj = prj;
- this.filter = filter;
- this.part = part;
- this.clsName = clsName;
- this.clause = clause;
- this.idxQryDesc = idxQryDesc;
- this.limit = limit;
- this.incMeta = incMeta;
- this.keepBinary = keepBinary;
- this.taskHash = taskHash;
- this.dataPageScanEnabled = dataPageScanEnabled;
- }
-
- /**
- * Cache query adapter for INDEX query.
- *
- * @param cctx Context.
- * @param type Query type.
- * @param idxQryDesc Index query descriptor.
- * @param part Partition number to iterate over.
- * @param clsName Class name.
- * @param filter Index query remote filter.
- */
- public GridCacheQueryAdapter(
- GridCacheContext<?, ?> cctx,
- GridCacheQueryType type,
- IndexQueryDesc idxQryDesc,
- @Nullable Integer part,
- @Nullable String clsName,
- @Nullable IgniteBiPredicate<Object, Object> filter
- ) {
- this(cctx, type, clsName, null, filter, part, false, false, null,
idxQryDesc);
- }
-
- /**
- * @return Flag to enable data page scan.
- */
- public Boolean isDataPageScanEnabled() {
- return dataPageScanEnabled;
- }
-
- /**
- * @return Type.
- */
- public GridCacheQueryType type() {
- return type;
- }
-
- /**
- * @return Class name.
- */
- @Nullable public String queryClassName() {
- return clsName;
- }
-
- /**
- * @return Clause.
- */
- @Nullable public String clause() {
- return clause;
- }
-
- /**
- * @return Include metadata flag.
- */
- public boolean includeMetadata() {
- return incMeta;
- }
-
- /**
- * @return {@code True} if binary should not be deserialized.
- */
- public boolean keepBinary() {
- return keepBinary;
- }
-
- /**
- * Forces query to keep binary object representation even if query was
created on plain projection.
- *
- * @param keepBinary Keep binary flag.
- */
- public void keepBinary(boolean keepBinary) {
- this.keepBinary = keepBinary;
- }
-
- /**
- * @return {@code True} if the query is forced local.
- */
- public boolean forceLocal() {
- return forceLocal;
- }
-
- /**
- * @return Task hash.
- */
- public int taskHash() {
- return taskHash;
- }
-
- /** {@inheritDoc} */
- @Override public CacheQuery<T> pageSize(int pageSize) {
- A.ensure(pageSize > 0, "pageSize > 0");
-
- this.pageSize = pageSize;
-
- return this;
- }
-
- /**
- * @return Page size.
- */
- public int pageSize() {
- return pageSize;
- }
-
- /** {@inheritDoc} */
- @Override public CacheQuery<T> timeout(long timeout) {
- A.ensure(timeout >= 0, "timeout >= 0");
-
- this.timeout = timeout;
-
- return this;
- }
-
- /**
- * @return Response limit. Returns 0 for no limits.
- **/
- public int limit() {
- return limit;
- }
-
- /** {@inheritDoc} */
- @Override public CacheQuery<T> limit(int limit) {
- this.limit = limit;
-
- return this;
- }
-
- /**
- * @return Timeout.
- */
- public long timeout() {
- return timeout;
- }
-
- /** {@inheritDoc} */
- @Override public CacheQuery<T> includeBackups(boolean incBackups) {
- this.incBackups = incBackups;
-
- return this;
- }
-
- /**
- * @return Include backups.
- */
- public boolean includeBackups() {
- return incBackups;
- }
-
- /** {@inheritDoc} */
- @Override public CacheQuery<T> enableDedup(boolean dedup) {
- this.dedup = dedup;
-
- return this;
- }
-
- /**
- * @return Enable dedup flag.
- */
- public boolean enableDedup() {
- return dedup;
- }
-
- /** {@inheritDoc} */
- @Override public CacheQuery<T> projection(ClusterGroup prj) {
- this.prj = prj;
-
- return this;
- }
-
- /**
- * @return Grid projection.
- */
- public ClusterGroup projection() {
- return prj;
- }
-
- /**
- * @return Key-value filter.
- */
- @Nullable public <K, V> IgniteBiPredicate<K, V> scanFilter() {
- return (IgniteBiPredicate<K, V>)filter;
- }
-
- /**
- * @return Transformer.
- */
- @Nullable public <K, V> IgniteClosure<Map.Entry<K, V>, Object> transform()
{
- return (IgniteClosure<Map.Entry<K, V>, Object>)transform;
- }
-
- /**
- * @return Partition.
- */
- @Nullable public Integer partition() {
- return part;
- }
-
- /**
- * @return Index query description.
- */
- @Nullable public IndexQueryDesc idxQryDesc() {
- return idxQryDesc;
- }
-
- /**
- * @throws IgniteCheckedException If query is invalid.
- */
- public void validate() throws IgniteCheckedException {
- if ((type != SCAN && type != SET && type != SPI && type != INDEX)
- && !QueryUtils.isEnabled(cctx.config()))
- throw new IgniteCheckedException("Indexing is disabled for cache:
" + cctx.cache().name());
- }
-
- /** {@inheritDoc} */
- @Override public CacheQueryFuture<T> execute(@Nullable Object... args) {
- return execute0(null, args);
- }
-
- /** {@inheritDoc} */
- @Override public <R> CacheQueryFuture<R> execute(IgniteReducer<T, R>
rmtReducer, @Nullable Object... args) {
- return execute0(rmtReducer, args);
- }
-
- /**
- * @param rmtReducer Optional reducer.
- * @param args Arguments.
- * @return Future.
- */
- @SuppressWarnings({"IfMayBeConditional"})
- private <R> CacheQueryFuture<R> execute0(@Nullable IgniteReducer<T, R>
rmtReducer, @Nullable Object... args) {
- assert type != SCAN : this;
-
- Collection<ClusterNode> nodes;
-
- try {
- nodes = nodes();
- }
- catch (IgniteCheckedException e) {
- return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), e);
- }
-
- cctx.checkSecurity(SecurityPermission.CACHE_READ);
-
- if (nodes.isEmpty())
- return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), new
ClusterGroupEmptyCheckedException());
-
- if (log.isDebugEnabled())
- log.debug("Executing query [query=" + this + ", nodes=" + nodes +
']');
-
- if (cctx.deploymentEnabled()) {
- try {
- cctx.deploy().registerClasses(filter, rmtReducer);
- cctx.deploy().registerClasses(args);
- }
- catch (IgniteCheckedException e) {
- return new GridCacheQueryErrorFuture<>(cctx.kernalContext(),
e);
- }
- }
-
- taskHash = cctx.kernalContext().job().currentTaskNameHash();
-
- final GridCacheQueryBean bean = new GridCacheQueryBean(this,
(IgniteReducer<Object, Object>)rmtReducer,
- null, args);
-
- final GridCacheQueryManager qryMgr = cctx.queries();
-
- boolean loc = nodes.size() == 1 &&
F.first(nodes).id().equals(cctx.localNodeId());
-
- if (type == SQL_FIELDS || type == SPI)
- return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) :
- qryMgr.queryFieldsDistributed(bean, nodes));
- else
- return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) :
qryMgr.queryDistributed(bean, nodes));
- }
-
- /** {@inheritDoc} */
- @Override public GridCloseableIterator executeScanQuery() throws
IgniteCheckedException {
- assert type == SCAN : "Wrong processing of query: " + type;
-
- GridDhtCacheAdapter<?, ?> cacheAdapter = cctx.isNear() ?
cctx.near().dht() : cctx.dht();
-
- Set<Integer> lostParts = cacheAdapter.topology().lostPartitions();
-
- if (!lostParts.isEmpty()) {
- if (part == null || lostParts.contains(part)) {
- throw new CacheException(new
CacheInvalidStateException("Failed to execute query because cache partition " +
- "has been lostParts [cacheName=" + cctx.name() +
- ", part=" + (part == null ? lostParts.iterator().next() :
part) + ']'));
- }
- }
-
- // Affinity nodes snapshot.
- Collection<ClusterNode> nodes = new ArrayList<>(nodes());
-
- cctx.checkSecurity(SecurityPermission.CACHE_READ);
-
- if (nodes.isEmpty()) {
- if (part != null) {
- if (forceLocal) {
- throw new IgniteCheckedException("No queryable nodes for
partition " + part
- + " [forced local query=" + this + "]");
- }
- }
-
- return new GridEmptyCloseableIterator();
- }
-
- if (log.isDebugEnabled())
- log.debug("Executing query [query=" + this + ", nodes=" + nodes +
']');
-
- if (cctx.deploymentEnabled())
- cctx.deploy().registerClasses(filter);
-
- taskHash = cctx.kernalContext().job().currentTaskNameHash();
-
- final GridCacheQueryManager qryMgr = cctx.queries();
-
- boolean loc = nodes.size() == 1 &&
F.first(nodes).id().equals(cctx.localNodeId());
-
- if (loc)
- return qryMgr.scanQueryLocal(this, true);
- else if (part != null)
- return new ScanQueryFallbackClosableIterator(part, this, qryMgr,
cctx);
- else
- return qryMgr.scanQueryDistributed(this, nodes);
- }
-
- /**
- * @return Nodes to execute on.
- */
- private Collection<ClusterNode> nodes() throws IgniteCheckedException {
- CacheMode cacheMode = cctx.config().getCacheMode();
-
- Integer part = partition();
-
- switch (cacheMode) {
- case REPLICATED:
- if (prj != null || part != null)
- return nodes(cctx, prj, part);
-
- GridDhtPartitionTopology top = cctx.topology();
-
- if (cctx.affinityNode() &&
!top.localPartitionMap().hasMovingPartitions())
- return Collections.singletonList(cctx.localNode());
-
- top.readLock();
-
- try {
-
- Collection<ClusterNode> affNodes = nodes(cctx, null, null);
-
- List<ClusterNode> nodes = new ArrayList<>(affNodes);
-
- Collections.shuffle(nodes);
-
- for (ClusterNode node : nodes) {
- if (!top.partitions(node.id()).hasMovingPartitions())
- return Collections.singletonList(node);
- }
-
- return affNodes;
- }
- finally {
- top.readUnlock();
- }
-
- case PARTITIONED:
- return nodes(cctx, prj, part);
-
- default:
- throw new IllegalStateException("Unknown cache distribution
mode: " + cacheMode);
- }
- }
-
- /**
- * @param cctx Cache context.
- * @param prj Projection (optional).
- * @return Collection of data nodes in provided projection (if any).
- * @throws IgniteCheckedException If partition number is invalid.
- */
- private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?>
cctx,
- @Nullable final ClusterGroup prj, @Nullable final Integer part) throws
IgniteCheckedException {
- assert cctx != null;
-
- final AffinityTopologyVersion topVer =
cctx.affinity().affinityTopologyVersion();
-
- Collection<ClusterNode> affNodes = CU.affinityNodes(cctx, topVer);
-
- if (prj == null && part == null)
- return affNodes;
-
- if (part != null && part >= cctx.affinity().partitions())
- throw new IgniteCheckedException("Invalid partition number: " +
part);
-
- final Set<ClusterNode> owners =
- part == null ? Collections.<ClusterNode>emptySet() : new
HashSet<>(cctx.topology().owners(part, topVer));
-
- return F.view(affNodes, new P1<ClusterNode>() {
- @Override public boolean apply(ClusterNode n) {
- return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
- (prj == null || prj.node(n.id()) != null) &&
- (part == null || owners.contains(n));
- }
- });
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridCacheQueryAdapter.class, this);
- }
-
- /**
- * Wrapper for queries with fallback.
- */
- private static class ScanQueryFallbackClosableIterator extends
GridCloseableIteratorAdapter {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Query future. */
- private volatile T2<GridCloseableIterator<Object>,
GridCacheQueryFutureAdapter> tuple;
-
- /** Backups. */
- private volatile Queue<ClusterNode> nodes;
-
- /** Topology version of the last detected {@link
GridDhtUnreservedPartitionException}. */
- private volatile AffinityTopologyVersion unreservedTopVer;
-
- /** Number of times to retry the query on the nodes failed with {@link
GridDhtUnreservedPartitionException}. */
- private volatile int unreservedNodesRetryCnt = 5;
-
- /** Bean. */
- private final GridCacheQueryAdapter qry;
-
- /** Query manager. */
- private final GridCacheQueryManager qryMgr;
-
- /** Cache context. */
- private final GridCacheContext cctx;
-
- /** Partition. */
- private final int part;
-
- /** Flag indicating that a first item has been returned to a user. */
- private boolean firstItemReturned;
-
- /** */
- private Object cur;
-
- /**
- * @param part Partition.
- * @param qry Query.
- * @param qryMgr Query manager.
- * @param cctx Cache context.
- */
- private ScanQueryFallbackClosableIterator(int part,
GridCacheQueryAdapter qry,
- GridCacheQueryManager qryMgr, GridCacheContext cctx) {
- this.qry = qry;
- this.qryMgr = qryMgr;
- this.cctx = cctx;
- this.part = part;
-
- nodes = fallbacks(cctx.shared().exchange().readyAffinityVersion());
-
- if (F.isEmpty(nodes))
- throw new ClusterTopologyException("Failed to execute the
query " +
- "(all affinity nodes left the grid) [cache=" + cctx.name()
+
- ", qry=" + qry +
- ", curTopVer=" +
qryMgr.queryTopologyVersion().topologyVersion() + ']');
-
- init();
- }
-
- /**
- * @param topVer Topology version.
- * @return Nodes for query execution.
- */
- private Queue<ClusterNode> fallbacks(AffinityTopologyVersion topVer) {
- Deque<ClusterNode> fallbacks = new LinkedList<>();
- Collection<ClusterNode> owners = new HashSet<>();
-
- for (ClusterNode node : cctx.topology().owners(part, topVer)) {
- if (node.isLocal())
- fallbacks.addFirst(node);
- else
- fallbacks.add(node);
-
- owners.add(node);
- }
-
- for (ClusterNode node : cctx.topology().moving(part)) {
- if (!owners.contains(node))
- fallbacks.add(node);
- }
-
- return fallbacks;
- }
-
- /**
- *
- */
- @SuppressWarnings("unchecked")
- private void init() {
- final ClusterNode node = nodes.poll();
-
- if (node.isLocal()) {
- try {
- GridCloseableIterator it = qryMgr.scanQueryLocal(qry,
true);
-
- tuple = new T2(it, null);
- }
- catch (IgniteClientDisconnectedCheckedException e) {
- throw CU.convertToCacheException(e);
- }
- catch (IgniteCheckedException e) {
- retryIfPossible(e);
- }
- }
- else {
- final GridCacheQueryBean bean = new GridCacheQueryBean(qry,
null, qry.transform, null);
-
- GridCacheQueryFutureAdapter fut =
- (GridCacheQueryFutureAdapter)qryMgr.queryDistributed(bean,
Collections.singleton(node));
-
- tuple = new T2(null, fut);
- }
- }
-
- /** {@inheritDoc} */
- @Override protected Object onNext() throws IgniteCheckedException {
- if (!onHasNext())
- throw new NoSuchElementException();
-
- assert cur != null;
-
- Object e = cur;
-
- cur = null;
-
- return e;
- }
-
- /** {@inheritDoc} */
- @Override protected boolean onHasNext() throws IgniteCheckedException {
- while (true) {
- if (cur != null)
- return true;
-
- T2<GridCloseableIterator<Object>, GridCacheQueryFutureAdapter>
t = tuple;
-
- GridCloseableIterator<Object> iter = t.get1();
-
- if (iter != null) {
- boolean hasNext = iter.hasNext();
-
- if (hasNext)
- cur = iter.next();
-
- return hasNext;
- }
- else {
- GridCacheQueryFutureAdapter fut = t.get2();
-
- assert fut != null;
-
- if (firstItemReturned)
- return (cur = convert(fut.next())) != null;
-
- try {
- fut.awaitFirstItemAvailable();
-
- firstItemReturned = true;
-
- return (cur = convert(fut.next())) != null;
- }
- catch (IgniteClientDisconnectedCheckedException e) {
- throw CU.convertToCacheException(e);
- }
- catch (IgniteCheckedException e) {
- retryIfPossible(e);
- }
- }
- }
- }
-
- /**
- * @param obj Entry to convert.
- * @return Cache entry
- */
- private Object convert(Object obj) {
- if (qry.transform() != null)
- return obj;
-
- Map.Entry e = (Map.Entry)obj;
-
- return e == null ? null : new CacheQueryEntry(e.getKey(),
e.getValue());
- }
-
- /**
- * @param e Exception for query run.
- */
- private void retryIfPossible(IgniteCheckedException e) {
- try {
- IgniteInternalFuture<?> retryFut;
-
- GridDhtUnreservedPartitionException partErr = X.cause(e,
GridDhtUnreservedPartitionException.class);
-
- if (partErr != null) {
- AffinityTopologyVersion waitVer =
partErr.topologyVersion();
-
- assert waitVer != null;
-
- retryFut =
cctx.shared().exchange().affinityReadyFuture(waitVer);
- }
- else if (e.hasCause(ClusterTopologyCheckedException.class)) {
- ClusterTopologyCheckedException topEx = X.cause(e,
ClusterTopologyCheckedException.class);
-
- retryFut = topEx.retryReadyFuture();
- }
- else if (e.hasCause(ClusterGroupEmptyCheckedException.class)) {
- ClusterGroupEmptyCheckedException ex = X.cause(e,
ClusterGroupEmptyCheckedException.class);
-
- retryFut = ex.retryReadyFuture();
- }
- else
- throw CU.convertToCacheException(e);
-
- if (F.isEmpty(nodes)) {
- if (--unreservedNodesRetryCnt > 0) {
- if (retryFut != null)
- retryFut.get();
-
- nodes = fallbacks(unreservedTopVer == null ?
cctx.shared().exchange().readyAffinityVersion() : unreservedTopVer);
-
- unreservedTopVer = null;
-
- init();
- }
- else
- throw CU.convertToCacheException(e);
- }
- else
- init();
- }
- catch (IgniteCheckedException ex) {
- throw CU.convertToCacheException(ex);
- }
- }
-
- /** {@inheritDoc} */
- @Override protected void onClose() throws IgniteCheckedException {
- super.onClose();
-
- T2<GridCloseableIterator<Object>, GridCacheQueryFutureAdapter> t =
tuple;
-
- if (t != null && t.get1() != null)
- t.get1().close();
-
- if (t != null && t.get2() != null)
- t.get2().cancel();
- }
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java
index 4dcb8821f35..c32428ed026 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java
@@ -27,7 +27,7 @@ import org.jetbrains.annotations.Nullable;
*/
public class GridCacheQueryBean {
/** */
- private final GridCacheQueryAdapter<?> qry;
+ private final CacheQuery<?> qry;
/** */
private final IgniteReducer<Object, Object> rdc;
@@ -44,8 +44,8 @@ public class GridCacheQueryBean {
* @param trans Optional transformer.
* @param args Optional arguments.
*/
- public GridCacheQueryBean(GridCacheQueryAdapter<?> qry, @Nullable
IgniteReducer<Object, Object> rdc,
- @Nullable IgniteClosure<?, ?> trans, @Nullable Object[] args) {
+ public GridCacheQueryBean(CacheQuery<?> qry, @Nullable
IgniteReducer<Object, Object> rdc,
+ @Nullable IgniteClosure<?, ?> trans, @Nullable
Object[] args) {
assert qry != null;
this.qry = qry;
@@ -57,7 +57,7 @@ public class GridCacheQueryBean {
/**
* @return Query.
*/
- public GridCacheQueryAdapter<?> query() {
+ public CacheQuery<?> query() {
return qry;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java
index 0a108d580ae..ffac23c8a2e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java
@@ -37,7 +37,7 @@ class GridCacheQueryInfo {
private IgniteReducer<Object, Object> rdc;
/** */
- private GridCacheQueryAdapter<?> qry;
+ private CacheQuery<?> qry;
/** */
private GridCacheLocalQueryFuture<?, ?, ?> locFut;
@@ -73,7 +73,7 @@ class GridCacheQueryInfo {
boolean loc,
IgniteClosure<?, ?> trans,
IgniteReducer<Object, Object> rdc,
- GridCacheQueryAdapter<?> qry,
+ CacheQuery<?> qry,
GridCacheLocalQueryFuture<?, ?, ?> locFut,
UUID sndId,
long reqId,
@@ -110,7 +110,7 @@ class GridCacheQueryInfo {
/**
* @return Query.
*/
- GridCacheQueryAdapter<?> query() {
+ CacheQuery<?> query() {
return qry;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 7dc28548194..9d2e65ac5b6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -503,7 +503,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
* @return Iterator.
* @throws IgniteCheckedException If failed.
*/
- public abstract GridCloseableIterator
scanQueryDistributed(GridCacheQueryAdapter qry,
+ public abstract GridCloseableIterator scanQueryDistributed(CacheQuery qry,
Collection<ClusterNode> nodes) throws IgniteCheckedException;
/**
@@ -541,7 +541,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
* @throws IgniteCheckedException In case of error.
*/
@SuppressWarnings("unchecked")
- private QueryResult<K, V> executeQuery(GridCacheQueryAdapter<?> qry,
+ private QueryResult<K, V> executeQuery(CacheQuery<?> qry,
IgniteClosure transformer, boolean loc, @Nullable String taskName,
Object rcpt)
throws IgniteCheckedException {
if (qry.type() == null) {
@@ -662,7 +662,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
* @return Collection of found keys.
* @throws IgniteCheckedException In case of error.
*/
- private FieldsResult executeFieldsQuery(GridCacheQueryAdapter<?> qry,
@Nullable Object[] args,
+ private FieldsResult executeFieldsQuery(CacheQuery<?> qry, @Nullable
Object[] args,
boolean loc, @Nullable String taskName, Object rcpt) throws
IgniteCheckedException {
assert qry != null;
@@ -758,13 +758,13 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
* @param qry Query.
* @return Cache set items iterator.
*/
- private GridCloseableIterator<IgniteBiTuple<K, V>>
sharedCacheSetIterator(GridCacheQueryAdapter<?> qry)
+ private GridCloseableIterator<IgniteBiTuple<K, V>>
sharedCacheSetIterator(CacheQuery<?> qry)
throws IgniteCheckedException {
final GridSetQueryPredicate filter =
(GridSetQueryPredicate)qry.scanFilter();
IgniteUuid id = filter.setId();
- GridCacheQueryAdapter<CacheEntry<K, ?>> qry0 = new
GridCacheQueryAdapter<>(cctx,
+ CacheQuery<CacheEntry<K, ?>> qry0 = new CacheQuery<>(cctx,
SCAN,
new IgniteBiPredicate<Object, Object>() {
@Override public boolean apply(Object k, Object v) {
@@ -792,7 +792,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
* @throws IgniteCheckedException If failed to get iterator.
*/
@SuppressWarnings({"unchecked"})
- private GridCloseableIterator scanIterator(final GridCacheQueryAdapter<?>
qry, IgniteClosure transformer,
+ private GridCloseableIterator scanIterator(final CacheQuery<?> qry,
IgniteClosure transformer,
boolean locNode)
throws IgniteCheckedException {
final InternalScanFilter<K, V> intFilter =
internalFilter(qry.scanFilter());
@@ -954,7 +954,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
// Preparing query closures.
final IgniteReducer<Object, Object> rdc =
injectResources((IgniteReducer<Object, Object>)qryInfo.reducer());
- GridCacheQueryAdapter<?> qry = qryInfo.query();
+ CacheQuery<?> qry = qryInfo.query();
int pageSize = qry.pageSize();
@@ -1146,7 +1146,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
boolean rmvIter = true;
- GridCacheQueryAdapter<?> qry = qryInfo.query();
+ CacheQuery<?> qry = qryInfo.query();
try {
IgniteReducer<Cache.Entry<K, V>, Object> rdc =
injectResources((IgniteReducer<Cache.Entry<K, V>, Object>)qryInfo.reducer());
@@ -1385,7 +1385,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
* @param updateStatistics Update statistics flag.
*/
@SuppressWarnings({"unchecked"})
- protected GridCloseableIterator scanQueryLocal(final GridCacheQueryAdapter
qry,
+ protected GridCloseableIterator scanQueryLocal(final CacheQuery qry,
boolean updateStatistics) throws IgniteCheckedException {
if (!enterBusy())
throw new IllegalStateException("Failed to process query request
(grid is stopping).");
@@ -1454,7 +1454,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
* @return GridCloseableIterator.
*/
@SuppressWarnings({"unchecked"})
- public GridCloseableIterator indexQueryLocal(final GridCacheQueryAdapter
qry) throws IgniteCheckedException {
+ public GridCloseableIterator indexQueryLocal(final CacheQuery qry) throws
IgniteCheckedException {
if (!enterBusy())
throw new IllegalStateException("Failed to process query request
(grid is stopping).");
@@ -2071,7 +2071,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
* @param qry Query.
* @return Filter.
*/
- private IndexingQueryFilter filter(GridCacheQueryAdapter<?> qry) {
+ private IndexingQueryFilter filter(CacheQuery<?> qry) {
return filter(qry, null, false);
}
@@ -2081,7 +2081,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
* @param treatReplicatedAsPartitioned If true, only primary partitions of
replicated caches will be used.
* @return Filter.
*/
- private IndexingQueryFilter filter(GridCacheQueryAdapter<?> qry, @Nullable
int[] partsArr, boolean treatReplicatedAsPartitioned) {
+ private IndexingQueryFilter filter(CacheQuery<?> qry, @Nullable int[]
partsArr, boolean treatReplicatedAsPartitioned) {
if (qry.includeBackups())
return null;
@@ -2879,7 +2879,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
* @return Query.
*/
public <R> CacheQuery<R> createSpiQuery(boolean keepBinary) {
- return new GridCacheQueryAdapter<>(cctx,
+ return new CacheQuery<>(cctx,
SPI,
null,
null,
@@ -2923,7 +2923,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
boolean forceLocal,
Boolean dataPageScanEnabled
) {
- return new GridCacheQueryAdapter(cctx,
+ return new CacheQuery(cctx,
SCAN,
filter,
trans,
@@ -2949,7 +2949,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
A.notNull("clsName", clsName);
A.notNull("search", search);
- return new GridCacheQueryAdapter<Map.Entry<K, V>>(cctx,
+ return new CacheQuery<Map.Entry<K, V>>(cctx,
TEXT,
clsName,
search,
@@ -2980,7 +2980,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
IndexQueryDesc desc = new IndexQueryDesc(qry.getCriteria(),
qry.getIndexName(), qry.getValueType());
- GridCacheQueryAdapter q = new GridCacheQueryAdapter<>(
+ CacheQuery q = new CacheQuery<>(
cctx, INDEX, desc, qry.getPartition(), qry.getValueType(),
qry.getFilter());
q.keepBinary(keepBinary);
@@ -3133,7 +3133,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
*/
ScanQueryIterator(
GridIterator<CacheDataRow> it,
- GridCacheQueryAdapter qry,
+ CacheQuery qry,
AffinityTopologyVersion topVer,
GridDhtLocalPartition locPart,
InternalScanFilter<K, V> intScanFilter,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 13934c13aa1..3015d335a06 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -166,7 +166,7 @@ public class GridCacheQueryRequest extends
GridCacheIdMessage implements GridCac
public static GridCacheQueryRequest startQueryRequest(GridCacheContext<?,
?> cctx, long reqId,
GridCacheDistributedQueryFuture<?, ?, ?> fut) {
GridCacheQueryBean bean = fut.query();
- GridCacheQueryAdapter<?> qry = bean.query();
+ CacheQuery<?> qry = bean.query();
boolean deployFilterOrTransformer = (qry.scanFilter() != null ||
qry.transform() != null)
&& cctx.gridDeploy().enabled();
@@ -203,7 +203,7 @@ public class GridCacheQueryRequest extends
GridCacheIdMessage implements GridCac
* @param reqId Request (cache query) ID.
*/
public static GridCacheQueryRequest pageRequest(GridCacheContext<?, ?>
cctx, long reqId,
- GridCacheQueryAdapter<?> qry, boolean fields) {
+ CacheQuery<?> qry, boolean fields) {
return new GridCacheQueryRequest(
cctx.cacheId(),
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index 05ac4b38826..23a9bdea145 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -44,7 +44,6 @@ import
org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.query.CacheQuery;
import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
-import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -165,7 +164,7 @@ public class GridCacheSetImpl<T> extends
AbstractCollection<T> implements Ignite
return cache.sizeAsync(new CachePeekMode[] {}).get() - 1;
}
- CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET,
+ CacheQuery qry = new CacheQuery<>(ctx, SET,
new GridSetQueryPredicate<>(id, collocated), null, collocated
? hdrPart : null,
false, false, null);
@@ -440,7 +439,7 @@ public class GridCacheSetImpl<T> extends
AbstractCollection<T> implements Ignite
*/
@SuppressWarnings("unchecked")
private WeakReferenceCloseableIterator<T> sharedCacheIterator(boolean
keepBinary) throws IgniteCheckedException {
- CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET,
+ CacheQuery qry = new CacheQuery<>(ctx, SET,
new GridSetQueryPredicate<>(id, collocated), null, collocated ?
hdrPart : null,
keepBinary, false, null);
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties
b/modules/core/src/main/resources/META-INF/classnames.properties
index 63d8fd49495..24dba9ea13e 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1208,8 +1208,8 @@
org.apache.ignite.internal.processors.cache.query.CacheQueryType
org.apache.ignite.internal.processors.cache.query.GridCacheDistributedQueryManager$1
org.apache.ignite.internal.processors.cache.query.GridCacheDistributedQueryManager$3
org.apache.ignite.internal.processors.cache.query.GridCacheDistributedQueryManager$4
-org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter$1
-org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter$ScanQueryFallbackClosableIterator
+org.apache.ignite.internal.processors.cache.query.CacheQuery$1
+org.apache.ignite.internal.processors.cache.query.CacheQuery$ScanQueryFallbackClosableIterator
org.apache.ignite.internal.processors.cache.query.GridCacheQueryDetailMetricsAdapter
org.apache.ignite.internal.processors.cache.query.GridCacheQueryFutureAdapter$1
org.apache.ignite.internal.processors.cache.query.GridCacheQueryFutureAdapter$2