This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 11fca4aaaa1 IGNITE-23593 Use CacheQuery instead of 
GridCacheQueryAdapter (#11639)
11fca4aaaa1 is described below

commit 11fca4aaaa11aecf8827d9313f17cfd1db6b170d
Author: Nikolay <[email protected]>
AuthorDate: Sat Nov 2 19:25:26 2024 +0300

    IGNITE-23593 Use CacheQuery instead of GridCacheQueryAdapter (#11639)
---
 .../processors/cache/IgniteCacheProxyImpl.java     |   3 +-
 .../processors/cache/query/CacheQuery.java         | 845 ++++++++++++++++++-
 .../query/GridCacheDistributedQueryManager.java    |   6 +-
 .../cache/query/GridCacheQueryAdapter.java         | 928 ---------------------
 .../processors/cache/query/GridCacheQueryBean.java |   8 +-
 .../processors/cache/query/GridCacheQueryInfo.java |   6 +-
 .../cache/query/GridCacheQueryManager.java         |  34 +-
 .../cache/query/GridCacheQueryRequest.java         |   4 +-
 .../datastructures/GridCacheSetImpl.java           |   5 +-
 .../main/resources/META-INF/classnames.properties  |   4 +-
 10 files changed, 868 insertions(+), 975 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index 2d2533a29a2..dba67c35100 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -81,7 +81,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.query.CacheQuery;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
-import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
@@ -562,7 +561,7 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
                 final GridCloseableIterator iter = 
ctx.kernalContext().query().executeQuery(GridCacheQueryType.INDEX,
                     cacheName, ctx, new 
IgniteOutClosureX<GridCloseableIterator>() {
                         @Override public GridCloseableIterator applyx() throws 
IgniteCheckedException {
-                            return 
ctx.queries().indexQueryLocal((GridCacheQueryAdapter)qry);
+                            return ctx.queries().indexQueryLocal(qry);
                         }
                     }, true);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
index c610c143915..2d9474993d0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
@@ -17,17 +17,63 @@
 
 package org.apache.ignite.internal.processors.cache.query;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.AffinityKey;
 import org.apache.ignite.cache.query.Query;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
 import org.apache.ignite.cache.query.annotations.QueryTextField;
 import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
+import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteReducer;
+import org.apache.ignite.plugin.security.SecurityPermission;
 import org.jetbrains.annotations.Nullable;
 
+import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.INDEX;
+import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SCAN;
+import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SET;
+import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SPI;
+import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
+
 /**
  * Main API for configuring and executing cache queries.
  * <p>
@@ -185,7 +231,269 @@ import org.jetbrains.annotations.Nullable;
  *     .get();
  * </pre>
  */
-public interface CacheQuery<T> {
+public class CacheQuery<T> {
+    /** */
+    private final GridCacheContext<?, ?> cctx;
+
+    /** */
+    private final GridCacheQueryType type;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** Class name in case of binary query. */
+    private final String clsName;
+
+    /** */
+    @GridToStringInclude(sensitive = true)
+    private final String clause;
+
+    /** Description of IndexQuery. */
+    private final IndexQueryDesc idxQryDesc;
+
+    /** */
+    private final IgniteBiPredicate<Object, Object> filter;
+
+    /** Limits returned records quantity. */
+    private int limit;
+
+    /** Transformer. */
+    private IgniteClosure<?, ?> transform;
+
+    /** Partition. */
+    private Integer part;
+
+    /** */
+    private final boolean incMeta;
+
+    /** */
+    private volatile int pageSize = Query.DFLT_PAGE_SIZE;
+
+    /** */
+    private volatile long timeout;
+
+    /** */
+    private volatile boolean incBackups;
+
+    /** Local query. */
+    private boolean forceLocal;
+
+    /** */
+    private volatile boolean dedup;
+
+    /** */
+    private volatile ClusterGroup prj;
+
+    /** */
+    private boolean keepBinary;
+
+    /** */
+    private int taskHash;
+
+    /** */
+    private Boolean dataPageScanEnabled;
+
+    /**
+     * Cache query adapter for SCAN query.
+     *
+     * @param cctx Context.
+     * @param type Query type.
+     * @param filter Scan filter.
+     * @param part Partition.
+     * @param keepBinary Keep binary flag.
+     * @param forceLocal Flag to force local query.
+     * @param dataPageScanEnabled Flag to enable data page scan.
+     */
+    public CacheQuery(
+        GridCacheContext<?, ?> cctx,
+        GridCacheQueryType type,
+        @Nullable IgniteBiPredicate<Object, Object> filter,
+        @Nullable IgniteClosure<Map.Entry, Object> transform,
+        @Nullable Integer part,
+        boolean keepBinary,
+        boolean forceLocal,
+        Boolean dataPageScanEnabled
+    ) {
+        this(cctx, type, null, null, filter, part, false, keepBinary, 
dataPageScanEnabled, null);
+
+        this.transform = transform;
+        this.forceLocal = forceLocal;
+    }
+
+    /**
+     * Cache query adapter for SET, SPI, TEXT queries.
+     *
+     * @param cctx Context.
+     * @param type Query type.
+     * @param clsName Class name.
+     * @param clause Clause.
+     * @param filter Scan filter.
+     * @param part Partition.
+     * @param incMeta Include metadata flag.
+     * @param keepBinary Keep binary flag.
+     * @param dataPageScanEnabled Flag to enable data page scan.
+     */
+    public CacheQuery(
+        GridCacheContext<?, ?> cctx,
+        GridCacheQueryType type,
+        @Nullable String clsName,
+        @Nullable String clause,
+        @Nullable IgniteBiPredicate<Object, Object> filter,
+        @Nullable Integer part,
+        boolean incMeta,
+        boolean keepBinary,
+        Boolean dataPageScanEnabled,
+        IndexQueryDesc idxQryDesc
+    ) {
+        assert cctx != null;
+        assert type != null;
+        assert part == null || part >= 0;
+
+        this.cctx = cctx;
+        this.type = type;
+        this.clsName = clsName;
+        this.clause = clause;
+        this.filter = filter;
+        this.part = part;
+        this.incMeta = incMeta;
+        this.keepBinary = keepBinary;
+        this.dataPageScanEnabled = dataPageScanEnabled;
+        this.idxQryDesc = idxQryDesc;
+
+        log = cctx.logger(getClass());
+    }
+
+    /**
+     * Cache query adapter for local query processing.
+     *
+     * @param cctx Context.
+     * @param type Query type.
+     * @param log Logger.
+     * @param pageSize Page size.
+     * @param timeout Timeout.
+     * @param incBackups Include backups flag.
+     * @param dedup Enable dedup flag.
+     * @param prj Grid projection.
+     * @param filter Key-value filter.
+     * @param part Partition.
+     * @param clsName Class name.
+     * @param clause Clause.
+     * @param limit Response limit. Set to 0 for no limits.
+     * @param incMeta Include metadata flag.
+     * @param keepBinary Keep binary flag.
+     * @param taskHash Task hash.
+     * @param dataPageScanEnabled Flag to enable data page scan.
+     */
+    public CacheQuery(
+        GridCacheContext<?, ?> cctx,
+        GridCacheQueryType type,
+        IgniteLogger log,
+        int pageSize,
+        long timeout,
+        boolean incBackups,
+        boolean dedup,
+        ClusterGroup prj,
+        IgniteBiPredicate<Object, Object> filter,
+        @Nullable Integer part,
+        @Nullable String clsName,
+        String clause,
+        IndexQueryDesc idxQryDesc,
+        int limit,
+        boolean incMeta,
+        boolean keepBinary,
+        int taskHash,
+        Boolean dataPageScanEnabled
+    ) {
+        this.cctx = cctx;
+        this.type = type;
+        this.log = log;
+        this.pageSize = pageSize;
+        this.timeout = timeout;
+        this.incBackups = incBackups;
+        this.dedup = dedup;
+        this.prj = prj;
+        this.filter = filter;
+        this.part = part;
+        this.clsName = clsName;
+        this.clause = clause;
+        this.idxQryDesc = idxQryDesc;
+        this.limit = limit;
+        this.incMeta = incMeta;
+        this.keepBinary = keepBinary;
+        this.taskHash = taskHash;
+        this.dataPageScanEnabled = dataPageScanEnabled;
+    }
+
+    /**
+     * Cache query adapter for INDEX query.
+     *
+     * @param cctx Context.
+     * @param type Query type.
+     * @param idxQryDesc Index query descriptor.
+     * @param part Partition number to iterate over.
+     * @param clsName Class name.
+     * @param filter Index query remote filter.
+     */
+    public CacheQuery(
+        GridCacheContext<?, ?> cctx,
+        GridCacheQueryType type,
+        IndexQueryDesc idxQryDesc,
+        @Nullable Integer part,
+        @Nullable String clsName,
+        @Nullable IgniteBiPredicate<Object, Object> filter
+    ) {
+        this(cctx, type, clsName, null, filter, part, false, false, null, 
idxQryDesc);
+    }
+
+    /** @return Flag to enable data page scan. */
+    public Boolean isDataPageScanEnabled() {
+        return dataPageScanEnabled;
+    }
+
+    /** @return Type. */
+    public GridCacheQueryType type() {
+        return type;
+    }
+
+    /** @return Class name. */
+    @Nullable public String queryClassName() {
+        return clsName;
+    }
+
+    /** @return Clause. */
+    @Nullable public String clause() {
+        return clause;
+    }
+
+    /** @return Include metadata flag. */
+    public boolean includeMetadata() {
+        return incMeta;
+    }
+
+    /** @return {@code True} if binary should not be deserialized. */
+    public boolean keepBinary() {
+        return keepBinary;
+    }
+
+    /**
+     * Forces query to keep binary object representation even if query was 
created on plain projection.
+     *
+     * @param keepBinary Keep binary flag.
+     */
+    public void keepBinary(boolean keepBinary) {
+        this.keepBinary = keepBinary;
+    }
+
+    /** @return {@code True} if the query is forced local. */
+    public boolean forceLocal() {
+        return forceLocal;
+    }
+
+    /** @return Task hash. */
+    public int taskHash() {
+        return taskHash;
+    }
+
     /**
      * Sets result page size. If not provided, {@link Query#DFLT_PAGE_SIZE} 
will be used.
      * Results are returned from queried nodes one page at a tme.
@@ -193,7 +501,18 @@ public interface CacheQuery<T> {
      * @param pageSize Page size.
      * @return {@code this} query instance for chaining.
      */
-    public CacheQuery<T> pageSize(int pageSize);
+    public CacheQuery<T> pageSize(int pageSize) {
+        A.ensure(pageSize > 0, "pageSize > 0");
+
+        this.pageSize = pageSize;
+
+        return this;
+    }
+
+    /** @return Page size. */
+    public int pageSize() {
+        return pageSize;
+    }
 
     /**
      * Sets query timeout. {@code 0} means there is no timeout (this
@@ -202,7 +521,18 @@ public interface CacheQuery<T> {
      * @param timeout Query timeout.
      * @return {@code this} query instance for chaining.
      */
-    public CacheQuery<T> timeout(long timeout);
+    public CacheQuery<T> timeout(long timeout) {
+        A.ensure(timeout >= 0, "timeout >= 0");
+
+        this.timeout = timeout;
+
+        return this;
+    }
+
+    /** @return Response limit. Returns 0 for no limits. */
+    public int limit() {
+        return limit;
+    }
 
     /**
      * Sets limit of returned records. {@code 0} means there is no limit
@@ -210,7 +540,16 @@ public interface CacheQuery<T> {
      * @param limit Records limit.
      * @return {@code this} query instance for chaining.
      */
-    public CacheQuery<T> limit(int limit);
+    public CacheQuery<T> limit(int limit) {
+        this.limit = limit;
+
+        return this;
+    }
+
+    /** @return Timeout. */
+    public long timeout() {
+        return timeout;
+    }
 
     /**
      * Sets whether or not to include backup entries into query result. This 
flag
@@ -219,7 +558,16 @@ public interface CacheQuery<T> {
      * @param incBackups Query {@code includeBackups} flag.
      * @return {@code this} query instance for chaining.
      */
-    public CacheQuery<T> includeBackups(boolean incBackups);
+    public CacheQuery<T> includeBackups(boolean incBackups) {
+        this.incBackups = incBackups;
+
+        return this;
+    }
+
+    /** @return Include backups. */
+    public boolean includeBackups() {
+        return incBackups;
+    }
 
     /**
      * Sets whether or not to deduplicate query result set. If this flag is 
{@code true}
@@ -229,7 +577,16 @@ public interface CacheQuery<T> {
      * @param dedup Query {@code enableDedup} flag.
      * @return {@code this} query instance for chaining.
      */
-    public CacheQuery<T> enableDedup(boolean dedup);
+    public CacheQuery<T> enableDedup(boolean dedup) {
+        this.dedup = dedup;
+
+        return this;
+    }
+
+    /** @return Enable dedup flag. */
+    public boolean enableDedup() {
+        return dedup;
+    }
 
     /**
      * Sets optional grid projection to execute this query on.
@@ -237,7 +594,43 @@ public interface CacheQuery<T> {
      * @param prj Projection.
      * @return {@code this} query instance for chaining.
      */
-    public CacheQuery<T> projection(ClusterGroup prj);
+    public CacheQuery<T> projection(ClusterGroup prj) {
+        this.prj = prj;
+
+        return this;
+    }
+
+    /** @return Grid projection. */
+    public ClusterGroup projection() {
+        return prj;
+    }
+
+    /** @return Key-value filter. */
+    @Nullable public <K, V> IgniteBiPredicate<K, V> scanFilter() {
+        return (IgniteBiPredicate<K, V>)filter;
+    }
+
+    /** @return Transformer. */
+    @Nullable public <K, V> IgniteClosure<Map.Entry<K, V>, Object> transform() 
{
+        return (IgniteClosure<Map.Entry<K, V>, Object>)transform;
+    }
+
+    /** @return Partition. */
+    @Nullable public Integer partition() {
+        return part;
+    }
+
+    /** @return Index query description. */
+    @Nullable public IndexQueryDesc idxQryDesc() {
+        return idxQryDesc;
+    }
+
+    /** @throws IgniteCheckedException If query is invalid. */
+    public void validate() throws IgniteCheckedException {
+        if ((type != SCAN && type != SET && type != SPI && type != INDEX)
+            && !QueryUtils.isEnabled(cctx.config()))
+            throw new IgniteCheckedException("Indexing is disabled for cache: 
" + cctx.cache().name());
+    }
 
     /**
      * Executes the query and returns the query future. Caller may decide to 
iterate
@@ -254,7 +647,9 @@ public interface CacheQuery<T> {
      * @param args Optional arguments.
      * @return Future for the query result.
      */
-    public CacheQueryFuture<T> execute(@Nullable Object... args);
+    public CacheQueryFuture<T> execute(@Nullable Object... args) {
+        return execute0(null, args);
+    }
 
     /**
      * Executes the query the same way as {@link #execute(Object...)} method 
but reduces result remotely.
@@ -263,10 +658,438 @@ public interface CacheQuery<T> {
      * @param args Optional arguments.
      * @return Future for the query result.
      */
-    public <R> CacheQueryFuture<R> execute(IgniteReducer<T, R> rmtReducer, 
@Nullable Object... args);
+    public <R> CacheQueryFuture<R> execute(IgniteReducer<T, R> rmtReducer, 
@Nullable Object... args) {
+        return execute0(rmtReducer, args);
+    }
+
+    /**
+     * @param rmtReducer Optional reducer.
+     * @param args Arguments.
+     * @return Future.
+     */
+    @SuppressWarnings({"IfMayBeConditional"})
+    private <R> CacheQueryFuture<R> execute0(@Nullable IgniteReducer<T, R> 
rmtReducer, @Nullable Object... args) {
+        assert type != SCAN : this;
+
+        Collection<ClusterNode> nodes;
+
+        try {
+            nodes = nodes();
+        }
+        catch (IgniteCheckedException e) {
+            return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), e);
+        }
+
+        cctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+        if (nodes.isEmpty())
+            return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), new 
ClusterGroupEmptyCheckedException());
+
+        if (log.isDebugEnabled())
+            log.debug("Executing query [query=" + this + ", nodes=" + nodes + 
']');
+
+        if (cctx.deploymentEnabled()) {
+            try {
+                cctx.deploy().registerClasses(filter, rmtReducer);
+                cctx.deploy().registerClasses(args);
+            }
+            catch (IgniteCheckedException e) {
+                return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), 
e);
+            }
+        }
+
+        taskHash = cctx.kernalContext().job().currentTaskNameHash();
+
+        final GridCacheQueryBean bean = new GridCacheQueryBean(this, 
(IgniteReducer<Object, Object>)rmtReducer,
+            null, args);
+
+        final GridCacheQueryManager qryMgr = cctx.queries();
+
+        boolean loc = nodes.size() == 1 && 
F.first(nodes).id().equals(cctx.localNodeId());
+
+        if (type == SQL_FIELDS || type == SPI)
+            return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) :
+                qryMgr.queryFieldsDistributed(bean, nodes));
+        else
+            return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : 
qryMgr.queryDistributed(bean, nodes));
+    }
+
+    /** @return Scan query iterator. */
+    public GridCloseableIterator executeScanQuery() throws 
IgniteCheckedException {
+        assert type == SCAN : "Wrong processing of query: " + type;
+
+        GridDhtCacheAdapter<?, ?> cacheAdapter = cctx.isNear() ? 
cctx.near().dht() : cctx.dht();
+
+        Set<Integer> lostParts = cacheAdapter.topology().lostPartitions();
+
+        if (!lostParts.isEmpty()) {
+            if (part == null || lostParts.contains(part)) {
+                throw new CacheException(new 
CacheInvalidStateException("Failed to execute query because cache partition " +
+                    "has been lostParts [cacheName=" + cctx.name() +
+                    ", part=" + (part == null ? lostParts.iterator().next() : 
part) + ']'));
+            }
+        }
+
+        // Affinity nodes snapshot.
+        Collection<ClusterNode> nodes = new ArrayList<>(nodes());
+
+        cctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+        if (nodes.isEmpty()) {
+            if (part != null) {
+                if (forceLocal) {
+                    throw new IgniteCheckedException("No queryable nodes for 
partition " + part
+                            + " [forced local query=" + this + "]");
+                }
+            }
+
+            return new GridEmptyCloseableIterator();
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Executing query [query=" + this + ", nodes=" + nodes + 
']');
+
+        if (cctx.deploymentEnabled())
+            cctx.deploy().registerClasses(filter);
+
+        taskHash = cctx.kernalContext().job().currentTaskNameHash();
+
+        final GridCacheQueryManager qryMgr = cctx.queries();
+
+        boolean loc = nodes.size() == 1 && 
F.first(nodes).id().equals(cctx.localNodeId());
+
+        if (loc)
+            return qryMgr.scanQueryLocal(this, true);
+        else if (part != null)
+            return new ScanQueryFallbackClosableIterator(part, this, qryMgr, 
cctx);
+        else
+            return qryMgr.scanQueryDistributed(this, nodes);
+    }
+
+    /** @return Nodes to execute on. */
+    private Collection<ClusterNode> nodes() throws IgniteCheckedException {
+        CacheMode cacheMode = cctx.config().getCacheMode();
+
+        Integer part = partition();
+
+        switch (cacheMode) {
+            case REPLICATED:
+                if (prj != null || part != null)
+                    return nodes(cctx, prj, part);
+
+                GridDhtPartitionTopology top = cctx.topology();
+
+                if (cctx.affinityNode() && 
!top.localPartitionMap().hasMovingPartitions())
+                    return Collections.singletonList(cctx.localNode());
+
+                top.readLock();
+
+                try {
+
+                    Collection<ClusterNode> affNodes = nodes(cctx, null, null);
+
+                    List<ClusterNode> nodes = new ArrayList<>(affNodes);
+
+                    Collections.shuffle(nodes);
+
+                    for (ClusterNode node : nodes) {
+                        if (!top.partitions(node.id()).hasMovingPartitions())
+                            return Collections.singletonList(node);
+                    }
+
+                    return affNodes;
+                }
+                finally {
+                    top.readUnlock();
+                }
+
+            case PARTITIONED:
+                return nodes(cctx, prj, part);
+
+            default:
+                throw new IllegalStateException("Unknown cache distribution 
mode: " + cacheMode);
+        }
+    }
 
     /**
-     * @return Scan query iterator.
+     * @param cctx Cache context.
+     * @param prj Projection (optional).
+     * @return Collection of data nodes in provided projection (if any).
+     * @throws IgniteCheckedException If partition number is invalid.
      */
-    public GridCloseableIterator executeScanQuery() throws 
IgniteCheckedException;
+    private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> 
cctx,
+        @Nullable final ClusterGroup prj, @Nullable final Integer part) throws 
IgniteCheckedException {
+        assert cctx != null;
+
+        final AffinityTopologyVersion topVer = 
cctx.affinity().affinityTopologyVersion();
+
+        Collection<ClusterNode> affNodes = CU.affinityNodes(cctx, topVer);
+
+        if (prj == null && part == null)
+            return affNodes;
+
+        if (part != null && part >= cctx.affinity().partitions())
+            throw new IgniteCheckedException("Invalid partition number: " + 
part);
+
+        final Set<ClusterNode> owners =
+            part == null ? Collections.<ClusterNode>emptySet() : new 
HashSet<>(cctx.topology().owners(part, topVer));
+
+        return F.view(affNodes, new P1<ClusterNode>() {
+            @Override public boolean apply(ClusterNode n) {
+                return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
+                    (prj == null || prj.node(n.id()) != null) &&
+                    (part == null || owners.contains(n));
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheQuery.class, this);
+    }
+
+    /** Wrapper for queries with fallback. */
+    private static class ScanQueryFallbackClosableIterator extends 
GridCloseableIteratorAdapter {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Query future. */
+        private volatile T2<GridCloseableIterator<Object>, 
GridCacheQueryFutureAdapter> tuple;
+
+        /** Backups. */
+        private volatile Queue<ClusterNode> nodes;
+
+        /** Topology version of the last detected {@link 
GridDhtUnreservedPartitionException}. */
+        private volatile AffinityTopologyVersion unreservedTopVer;
+
+        /** Number of times to retry the query on the nodes failed with {@link 
GridDhtUnreservedPartitionException}. */
+        private volatile int unreservedNodesRetryCnt = 5;
+
+        /** Bean. */
+        private final CacheQuery qry;
+
+        /** Query manager. */
+        private final GridCacheQueryManager qryMgr;
+
+        /** Cache context. */
+        private final GridCacheContext cctx;
+
+        /** Partition. */
+        private final int part;
+
+        /** Flag indicating that a first item has been returned to a user. */
+        private boolean firstItemReturned;
+
+        /** */
+        private Object cur;
+
+        /**
+         * @param part Partition.
+         * @param qry Query.
+         * @param qryMgr Query manager.
+         * @param cctx Cache context.
+         */
+        private ScanQueryFallbackClosableIterator(int part, CacheQuery qry,
+            GridCacheQueryManager qryMgr, GridCacheContext cctx) {
+            this.qry = qry;
+            this.qryMgr = qryMgr;
+            this.cctx = cctx;
+            this.part = part;
+
+            nodes = fallbacks(cctx.shared().exchange().readyAffinityVersion());
+
+            if (F.isEmpty(nodes))
+                throw new ClusterTopologyException("Failed to execute the 
query " +
+                    "(all affinity nodes left the grid) [cache=" + cctx.name() 
+
+                    ", qry=" + qry +
+                    ", curTopVer=" + 
qryMgr.queryTopologyVersion().topologyVersion() + ']');
+
+            init();
+        }
+
+        /**
+         * @param topVer Topology version.
+         * @return Nodes for query execution.
+         */
+        private Queue<ClusterNode> fallbacks(AffinityTopologyVersion topVer) {
+            Deque<ClusterNode> fallbacks = new LinkedList<>();
+            Collection<ClusterNode> owners = new HashSet<>();
+
+            for (ClusterNode node : cctx.topology().owners(part, topVer)) {
+                if (node.isLocal())
+                    fallbacks.addFirst(node);
+                else
+                    fallbacks.add(node);
+
+                owners.add(node);
+            }
+
+            for (ClusterNode node : cctx.topology().moving(part)) {
+                if (!owners.contains(node))
+                    fallbacks.add(node);
+            }
+
+            return fallbacks;
+        }
+
+        /** */
+        @SuppressWarnings("unchecked")
+        private void init() {
+            final ClusterNode node = nodes.poll();
+
+            if (node.isLocal()) {
+                try {
+                    GridCloseableIterator it = qryMgr.scanQueryLocal(qry, 
true);
+
+                    tuple = new T2(it, null);
+                }
+                catch (IgniteClientDisconnectedCheckedException e) {
+                    throw CU.convertToCacheException(e);
+                }
+                catch (IgniteCheckedException e) {
+                    retryIfPossible(e);
+                }
+            }
+            else {
+                final GridCacheQueryBean bean = new GridCacheQueryBean(qry, 
null, qry.transform, null);
+
+                GridCacheQueryFutureAdapter fut =
+                    (GridCacheQueryFutureAdapter)qryMgr.queryDistributed(bean, 
Collections.singleton(node));
+
+                tuple = new T2(null, fut);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Object onNext() throws IgniteCheckedException {
+            if (!onHasNext())
+                throw new NoSuchElementException();
+
+            assert cur != null;
+
+            Object e = cur;
+
+            cur = null;
+
+            return e;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected boolean onHasNext() throws IgniteCheckedException {
+            while (true) {
+                if (cur != null)
+                    return true;
+
+                T2<GridCloseableIterator<Object>, GridCacheQueryFutureAdapter> 
t = tuple;
+
+                GridCloseableIterator<Object> iter = t.get1();
+
+                if (iter != null) {
+                    boolean hasNext = iter.hasNext();
+
+                    if (hasNext)
+                        cur = iter.next();
+
+                    return hasNext;
+                }
+                else {
+                    GridCacheQueryFutureAdapter fut = t.get2();
+
+                    assert fut != null;
+
+                    if (firstItemReturned)
+                        return (cur = convert(fut.next())) != null;
+
+                    try {
+                        fut.awaitFirstItemAvailable();
+
+                        firstItemReturned = true;
+
+                        return (cur = convert(fut.next())) != null;
+                    }
+                    catch (IgniteClientDisconnectedCheckedException e) {
+                        throw CU.convertToCacheException(e);
+                    }
+                    catch (IgniteCheckedException e) {
+                        retryIfPossible(e);
+                    }
+                }
+            }
+        }
+
+        /**
+         * @param obj Entry to convert.
+         * @return Cache entry
+         */
+        private Object convert(Object obj) {
+            if (qry.transform() != null)
+                return obj;
+
+            Map.Entry e = (Map.Entry)obj;
+
+            return e == null ? null : new CacheQueryEntry(e.getKey(), 
e.getValue());
+        }
+
+        /** @param e Exception for query run. */
+        private void retryIfPossible(IgniteCheckedException e) {
+            try {
+                IgniteInternalFuture<?> retryFut;
+
+                GridDhtUnreservedPartitionException partErr = X.cause(e, 
GridDhtUnreservedPartitionException.class);
+
+                if (partErr != null) {
+                    AffinityTopologyVersion waitVer = 
partErr.topologyVersion();
+
+                    assert waitVer != null;
+
+                    retryFut = 
cctx.shared().exchange().affinityReadyFuture(waitVer);
+                }
+                else if (e.hasCause(ClusterTopologyCheckedException.class)) {
+                    ClusterTopologyCheckedException topEx = X.cause(e, 
ClusterTopologyCheckedException.class);
+
+                    retryFut = topEx.retryReadyFuture();
+                }
+                else if (e.hasCause(ClusterGroupEmptyCheckedException.class)) {
+                    ClusterGroupEmptyCheckedException ex = X.cause(e, 
ClusterGroupEmptyCheckedException.class);
+
+                    retryFut = ex.retryReadyFuture();
+                }
+                else
+                    throw CU.convertToCacheException(e);
+
+                if (F.isEmpty(nodes)) {
+                    if (--unreservedNodesRetryCnt > 0) {
+                        if (retryFut != null)
+                            retryFut.get();
+
+                        nodes = fallbacks(unreservedTopVer == null ? 
cctx.shared().exchange().readyAffinityVersion() : unreservedTopVer);
+
+                        unreservedTopVer = null;
+
+                        init();
+                    }
+                    else
+                        throw CU.convertToCacheException(e);
+                }
+                else
+                    init();
+            }
+            catch (IgniteCheckedException ex) {
+                throw CU.convertToCacheException(ex);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void onClose() throws IgniteCheckedException {
+            super.onClose();
+
+            T2<GridCloseableIterator<Object>, GridCacheQueryFutureAdapter> t = 
tuple;
+
+            if (t != null && t.get1() != null)
+                t.get1().close();
+
+            if (t != null && t.get2() != null)
+                t.get2().cancel();
+        }
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 0bbb9ca219c..5e959eac1b4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -244,8 +244,8 @@ public class GridCacheDistributedQueryManager<K, V> extends 
GridCacheQueryManage
         if (sndNode == null)
             return null;
 
-        GridCacheQueryAdapter<?> qry =
-            new GridCacheQueryAdapter<>(
+        CacheQuery<?> qry =
+            new CacheQuery<>(
                 cctx,
                 req.type(),
                 log,
@@ -541,7 +541,7 @@ public class GridCacheDistributedQueryManager<K, V> extends 
GridCacheQueryManage
 
     /** {@inheritDoc} */
     @SuppressWarnings({"unchecked"})
-    @Override public GridCloseableIterator scanQueryDistributed(final 
GridCacheQueryAdapter qry,
+    @Override public GridCloseableIterator scanQueryDistributed(final 
CacheQuery qry,
         Collection<ClusterNode> nodes) throws IgniteCheckedException {
         assert qry.type() == GridCacheQueryType.SCAN : qry;
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
deleted file mode 100644
index 3ffb78b2c01..00000000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ /dev/null
@@ -1,928 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Queue;
-import java.util.Set;
-import javax.cache.CacheException;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.query.Query;
-import org.apache.ignite.cluster.ClusterGroup;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.cluster.ClusterTopologyException;
-import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
-import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
-import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
-import org.apache.ignite.internal.util.lang.GridCloseableIterator;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.lang.IgniteReducer;
-import org.apache.ignite.plugin.security.SecurityPermission;
-import org.jetbrains.annotations.Nullable;
-
-import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.INDEX;
-import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SCAN;
-import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SET;
-import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SPI;
-import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
-
-/**
- * Query adapter.
- */
-public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
-    /** */
-    private final GridCacheContext<?, ?> cctx;
-
-    /** */
-    private final GridCacheQueryType type;
-
-    /** */
-    private final IgniteLogger log;
-
-    /** Class name in case of binary query. */
-    private final String clsName;
-
-    /** */
-    @GridToStringInclude(sensitive = true)
-    private final String clause;
-
-    /** Description of IndexQuery. */
-    private final IndexQueryDesc idxQryDesc;
-
-    /** */
-    private final IgniteBiPredicate<Object, Object> filter;
-
-    /** Limits returned records quantity. */
-    private int limit;
-
-    /** Transformer. */
-    private IgniteClosure<?, ?> transform;
-
-    /** Partition. */
-    private Integer part;
-
-    /** */
-    private final boolean incMeta;
-
-    /** */
-    private volatile int pageSize = Query.DFLT_PAGE_SIZE;
-
-    /** */
-    private volatile long timeout;
-
-    /** */
-    private volatile boolean incBackups;
-
-    /** Local query. */
-    private boolean forceLocal;
-
-    /** */
-    private volatile boolean dedup;
-
-    /** */
-    private volatile ClusterGroup prj;
-
-    /** */
-    private boolean keepBinary;
-
-    /** */
-    private int taskHash;
-
-    /** */
-    private Boolean dataPageScanEnabled;
-
-    /**
-     * Cache query adapter for SCAN query.
-     *
-     * @param cctx Context.
-     * @param type Query type.
-     * @param filter Scan filter.
-     * @param part Partition.
-     * @param keepBinary Keep binary flag.
-     * @param forceLocal Flag to force local query.
-     * @param dataPageScanEnabled Flag to enable data page scan.
-     */
-    public GridCacheQueryAdapter(
-        GridCacheContext<?, ?> cctx,
-        GridCacheQueryType type,
-        @Nullable IgniteBiPredicate<Object, Object> filter,
-        @Nullable IgniteClosure<Map.Entry, Object> transform,
-        @Nullable Integer part,
-        boolean keepBinary,
-        boolean forceLocal,
-        Boolean dataPageScanEnabled
-    ) {
-        this(cctx, type, null, null, filter, part, false, keepBinary, 
dataPageScanEnabled, null);
-
-        this.transform = transform;
-        this.forceLocal = forceLocal;
-    }
-
-    /**
-     * Cache query adapter for SET, SPI, TEXT queries.
-     *
-     * @param cctx Context.
-     * @param type Query type.
-     * @param clsName Class name.
-     * @param clause Clause.
-     * @param filter Scan filter.
-     * @param part Partition.
-     * @param incMeta Include metadata flag.
-     * @param keepBinary Keep binary flag.
-     * @param dataPageScanEnabled Flag to enable data page scan.
-     */
-    public GridCacheQueryAdapter(
-        GridCacheContext<?, ?> cctx,
-        GridCacheQueryType type,
-        @Nullable String clsName,
-        @Nullable String clause,
-        @Nullable IgniteBiPredicate<Object, Object> filter,
-        @Nullable Integer part,
-        boolean incMeta,
-        boolean keepBinary,
-        Boolean dataPageScanEnabled,
-        IndexQueryDesc idxQryDesc
-    ) {
-        assert cctx != null;
-        assert type != null;
-        assert part == null || part >= 0;
-
-        this.cctx = cctx;
-        this.type = type;
-        this.clsName = clsName;
-        this.clause = clause;
-        this.filter = filter;
-        this.part = part;
-        this.incMeta = incMeta;
-        this.keepBinary = keepBinary;
-        this.dataPageScanEnabled = dataPageScanEnabled;
-        this.idxQryDesc = idxQryDesc;
-
-        log = cctx.logger(getClass());
-    }
-
-    /**
-     * Cache query adapter for local query processing.
-     *
-     * @param cctx Context.
-     * @param type Query type.
-     * @param log Logger.
-     * @param pageSize Page size.
-     * @param timeout Timeout.
-     * @param incBackups Include backups flag.
-     * @param dedup Enable dedup flag.
-     * @param prj Grid projection.
-     * @param filter Key-value filter.
-     * @param part Partition.
-     * @param clsName Class name.
-     * @param clause Clause.
-     * @param limit Response limit. Set to 0 for no limits.
-     * @param incMeta Include metadata flag.
-     * @param keepBinary Keep binary flag.
-     * @param taskHash Task hash.
-     * @param dataPageScanEnabled Flag to enable data page scan.
-     */
-    public GridCacheQueryAdapter(
-        GridCacheContext<?, ?> cctx,
-        GridCacheQueryType type,
-        IgniteLogger log,
-        int pageSize,
-        long timeout,
-        boolean incBackups,
-        boolean dedup,
-        ClusterGroup prj,
-        IgniteBiPredicate<Object, Object> filter,
-        @Nullable Integer part,
-        @Nullable String clsName,
-        String clause,
-        IndexQueryDesc idxQryDesc,
-        int limit,
-        boolean incMeta,
-        boolean keepBinary,
-        int taskHash,
-        Boolean dataPageScanEnabled
-    ) {
-        this.cctx = cctx;
-        this.type = type;
-        this.log = log;
-        this.pageSize = pageSize;
-        this.timeout = timeout;
-        this.incBackups = incBackups;
-        this.dedup = dedup;
-        this.prj = prj;
-        this.filter = filter;
-        this.part = part;
-        this.clsName = clsName;
-        this.clause = clause;
-        this.idxQryDesc = idxQryDesc;
-        this.limit = limit;
-        this.incMeta = incMeta;
-        this.keepBinary = keepBinary;
-        this.taskHash = taskHash;
-        this.dataPageScanEnabled = dataPageScanEnabled;
-    }
-
-    /**
-     * Cache query adapter for INDEX query.
-     *
-     * @param cctx Context.
-     * @param type Query type.
-     * @param idxQryDesc Index query descriptor.
-     * @param part Partition number to iterate over.
-     * @param clsName Class name.
-     * @param filter Index query remote filter.
-     */
-    public GridCacheQueryAdapter(
-        GridCacheContext<?, ?> cctx,
-        GridCacheQueryType type,
-        IndexQueryDesc idxQryDesc,
-        @Nullable Integer part,
-        @Nullable String clsName,
-        @Nullable IgniteBiPredicate<Object, Object> filter
-    ) {
-        this(cctx, type, clsName, null, filter, part, false, false, null, 
idxQryDesc);
-    }
-
-    /**
-     * @return Flag to enable data page scan.
-     */
-    public Boolean isDataPageScanEnabled() {
-        return dataPageScanEnabled;
-    }
-
-    /**
-     * @return Type.
-     */
-    public GridCacheQueryType type() {
-        return type;
-    }
-
-    /**
-     * @return Class name.
-     */
-    @Nullable public String queryClassName() {
-        return clsName;
-    }
-
-    /**
-     * @return Clause.
-     */
-    @Nullable public String clause() {
-        return clause;
-    }
-
-    /**
-     * @return Include metadata flag.
-     */
-    public boolean includeMetadata() {
-        return incMeta;
-    }
-
-    /**
-     * @return {@code True} if binary should not be deserialized.
-     */
-    public boolean keepBinary() {
-        return keepBinary;
-    }
-
-    /**
-     * Forces query to keep binary object representation even if query was 
created on plain projection.
-     *
-     * @param keepBinary Keep binary flag.
-     */
-    public void keepBinary(boolean keepBinary) {
-        this.keepBinary = keepBinary;
-    }
-
-    /**
-     * @return {@code True} if the query is forced local.
-     */
-    public boolean forceLocal() {
-        return forceLocal;
-    }
-
-    /**
-     * @return Task hash.
-     */
-    public int taskHash() {
-        return taskHash;
-    }
-
-    /** {@inheritDoc} */
-    @Override public CacheQuery<T> pageSize(int pageSize) {
-        A.ensure(pageSize > 0, "pageSize > 0");
-
-        this.pageSize = pageSize;
-
-        return this;
-    }
-
-    /**
-     * @return Page size.
-     */
-    public int pageSize() {
-        return pageSize;
-    }
-
-    /** {@inheritDoc} */
-    @Override public CacheQuery<T> timeout(long timeout) {
-        A.ensure(timeout >= 0, "timeout >= 0");
-
-        this.timeout = timeout;
-
-        return this;
-    }
-
-    /**
-     * @return Response limit. Returns 0 for no limits.
-     **/
-    public int limit() {
-        return limit;
-    }
-
-    /** {@inheritDoc} */
-    @Override public CacheQuery<T> limit(int limit) {
-        this.limit = limit;
-
-        return this;
-    }
-
-    /**
-     * @return Timeout.
-     */
-    public long timeout() {
-        return timeout;
-    }
-
-    /** {@inheritDoc} */
-    @Override public CacheQuery<T> includeBackups(boolean incBackups) {
-        this.incBackups = incBackups;
-
-        return this;
-    }
-
-    /**
-     * @return Include backups.
-     */
-    public boolean includeBackups() {
-        return incBackups;
-    }
-
-    /** {@inheritDoc} */
-    @Override public CacheQuery<T> enableDedup(boolean dedup) {
-        this.dedup = dedup;
-
-        return this;
-    }
-
-    /**
-     * @return Enable dedup flag.
-     */
-    public boolean enableDedup() {
-        return dedup;
-    }
-
-    /** {@inheritDoc} */
-    @Override public CacheQuery<T> projection(ClusterGroup prj) {
-        this.prj = prj;
-
-        return this;
-    }
-
-    /**
-     * @return Grid projection.
-     */
-    public ClusterGroup projection() {
-        return prj;
-    }
-
-    /**
-     * @return Key-value filter.
-     */
-    @Nullable public <K, V> IgniteBiPredicate<K, V> scanFilter() {
-        return (IgniteBiPredicate<K, V>)filter;
-    }
-
-    /**
-     * @return Transformer.
-     */
-    @Nullable public <K, V> IgniteClosure<Map.Entry<K, V>, Object> transform() 
{
-        return (IgniteClosure<Map.Entry<K, V>, Object>)transform;
-    }
-
-    /**
-     * @return Partition.
-     */
-    @Nullable public Integer partition() {
-        return part;
-    }
-
-    /**
-     * @return Index query description.
-     */
-    @Nullable public IndexQueryDesc idxQryDesc() {
-        return idxQryDesc;
-    }
-
-    /**
-     * @throws IgniteCheckedException If query is invalid.
-     */
-    public void validate() throws IgniteCheckedException {
-        if ((type != SCAN && type != SET && type != SPI && type != INDEX)
-            && !QueryUtils.isEnabled(cctx.config()))
-            throw new IgniteCheckedException("Indexing is disabled for cache: 
" + cctx.cache().name());
-    }
-
-    /** {@inheritDoc} */
-    @Override public CacheQueryFuture<T> execute(@Nullable Object... args) {
-        return execute0(null, args);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <R> CacheQueryFuture<R> execute(IgniteReducer<T, R> 
rmtReducer, @Nullable Object... args) {
-        return execute0(rmtReducer, args);
-    }
-
-    /**
-     * @param rmtReducer Optional reducer.
-     * @param args Arguments.
-     * @return Future.
-     */
-    @SuppressWarnings({"IfMayBeConditional"})
-    private <R> CacheQueryFuture<R> execute0(@Nullable IgniteReducer<T, R> 
rmtReducer, @Nullable Object... args) {
-        assert type != SCAN : this;
-
-        Collection<ClusterNode> nodes;
-
-        try {
-            nodes = nodes();
-        }
-        catch (IgniteCheckedException e) {
-            return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), e);
-        }
-
-        cctx.checkSecurity(SecurityPermission.CACHE_READ);
-
-        if (nodes.isEmpty())
-            return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), new 
ClusterGroupEmptyCheckedException());
-
-        if (log.isDebugEnabled())
-            log.debug("Executing query [query=" + this + ", nodes=" + nodes + 
']');
-
-        if (cctx.deploymentEnabled()) {
-            try {
-                cctx.deploy().registerClasses(filter, rmtReducer);
-                cctx.deploy().registerClasses(args);
-            }
-            catch (IgniteCheckedException e) {
-                return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), 
e);
-            }
-        }
-
-        taskHash = cctx.kernalContext().job().currentTaskNameHash();
-
-        final GridCacheQueryBean bean = new GridCacheQueryBean(this, 
(IgniteReducer<Object, Object>)rmtReducer,
-            null, args);
-
-        final GridCacheQueryManager qryMgr = cctx.queries();
-
-        boolean loc = nodes.size() == 1 && 
F.first(nodes).id().equals(cctx.localNodeId());
-
-        if (type == SQL_FIELDS || type == SPI)
-            return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) :
-                qryMgr.queryFieldsDistributed(bean, nodes));
-        else
-            return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : 
qryMgr.queryDistributed(bean, nodes));
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCloseableIterator executeScanQuery() throws 
IgniteCheckedException {
-        assert type == SCAN : "Wrong processing of query: " + type;
-
-        GridDhtCacheAdapter<?, ?> cacheAdapter = cctx.isNear() ? 
cctx.near().dht() : cctx.dht();
-
-        Set<Integer> lostParts = cacheAdapter.topology().lostPartitions();
-
-        if (!lostParts.isEmpty()) {
-            if (part == null || lostParts.contains(part)) {
-                throw new CacheException(new 
CacheInvalidStateException("Failed to execute query because cache partition " +
-                    "has been lostParts [cacheName=" + cctx.name() +
-                    ", part=" + (part == null ? lostParts.iterator().next() : 
part) + ']'));
-            }
-        }
-
-        // Affinity nodes snapshot.
-        Collection<ClusterNode> nodes = new ArrayList<>(nodes());
-
-        cctx.checkSecurity(SecurityPermission.CACHE_READ);
-
-        if (nodes.isEmpty()) {
-            if (part != null) {
-                if (forceLocal) {
-                    throw new IgniteCheckedException("No queryable nodes for 
partition " + part
-                            + " [forced local query=" + this + "]");
-                }
-            }
-
-            return new GridEmptyCloseableIterator();
-        }
-
-        if (log.isDebugEnabled())
-            log.debug("Executing query [query=" + this + ", nodes=" + nodes + 
']');
-
-        if (cctx.deploymentEnabled())
-            cctx.deploy().registerClasses(filter);
-
-        taskHash = cctx.kernalContext().job().currentTaskNameHash();
-
-        final GridCacheQueryManager qryMgr = cctx.queries();
-
-        boolean loc = nodes.size() == 1 && 
F.first(nodes).id().equals(cctx.localNodeId());
-
-        if (loc)
-            return qryMgr.scanQueryLocal(this, true);
-        else if (part != null)
-            return new ScanQueryFallbackClosableIterator(part, this, qryMgr, 
cctx);
-        else
-            return qryMgr.scanQueryDistributed(this, nodes);
-    }
-
-    /**
-     * @return Nodes to execute on.
-     */
-    private Collection<ClusterNode> nodes() throws IgniteCheckedException {
-        CacheMode cacheMode = cctx.config().getCacheMode();
-
-        Integer part = partition();
-
-        switch (cacheMode) {
-            case REPLICATED:
-                if (prj != null || part != null)
-                    return nodes(cctx, prj, part);
-
-                GridDhtPartitionTopology top = cctx.topology();
-
-                if (cctx.affinityNode() && 
!top.localPartitionMap().hasMovingPartitions())
-                    return Collections.singletonList(cctx.localNode());
-
-                top.readLock();
-
-                try {
-
-                    Collection<ClusterNode> affNodes = nodes(cctx, null, null);
-
-                    List<ClusterNode> nodes = new ArrayList<>(affNodes);
-
-                    Collections.shuffle(nodes);
-
-                    for (ClusterNode node : nodes) {
-                        if (!top.partitions(node.id()).hasMovingPartitions())
-                            return Collections.singletonList(node);
-                    }
-
-                    return affNodes;
-                }
-                finally {
-                    top.readUnlock();
-                }
-
-            case PARTITIONED:
-                return nodes(cctx, prj, part);
-
-            default:
-                throw new IllegalStateException("Unknown cache distribution 
mode: " + cacheMode);
-        }
-    }
-
-    /**
-     * @param cctx Cache context.
-     * @param prj Projection (optional).
-     * @return Collection of data nodes in provided projection (if any).
-     * @throws IgniteCheckedException If partition number is invalid.
-     */
-    private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> 
cctx,
-        @Nullable final ClusterGroup prj, @Nullable final Integer part) throws 
IgniteCheckedException {
-        assert cctx != null;
-
-        final AffinityTopologyVersion topVer = 
cctx.affinity().affinityTopologyVersion();
-
-        Collection<ClusterNode> affNodes = CU.affinityNodes(cctx, topVer);
-
-        if (prj == null && part == null)
-            return affNodes;
-
-        if (part != null && part >= cctx.affinity().partitions())
-            throw new IgniteCheckedException("Invalid partition number: " + 
part);
-
-        final Set<ClusterNode> owners =
-            part == null ? Collections.<ClusterNode>emptySet() : new 
HashSet<>(cctx.topology().owners(part, topVer));
-
-        return F.view(affNodes, new P1<ClusterNode>() {
-            @Override public boolean apply(ClusterNode n) {
-                return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
-                    (prj == null || prj.node(n.id()) != null) &&
-                    (part == null || owners.contains(n));
-            }
-        });
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheQueryAdapter.class, this);
-    }
-
-    /**
-     * Wrapper for queries with fallback.
-     */
-    private static class ScanQueryFallbackClosableIterator extends 
GridCloseableIteratorAdapter {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Query future. */
-        private volatile T2<GridCloseableIterator<Object>, 
GridCacheQueryFutureAdapter> tuple;
-
-        /** Backups. */
-        private volatile Queue<ClusterNode> nodes;
-
-        /** Topology version of the last detected {@link 
GridDhtUnreservedPartitionException}. */
-        private volatile AffinityTopologyVersion unreservedTopVer;
-
-        /** Number of times to retry the query on the nodes failed with {@link 
GridDhtUnreservedPartitionException}. */
-        private volatile int unreservedNodesRetryCnt = 5;
-
-        /** Bean. */
-        private final GridCacheQueryAdapter qry;
-
-        /** Query manager. */
-        private final GridCacheQueryManager qryMgr;
-
-        /** Cache context. */
-        private final GridCacheContext cctx;
-
-        /** Partition. */
-        private final int part;
-
-        /** Flag indicating that a first item has been returned to a user. */
-        private boolean firstItemReturned;
-
-        /** */
-        private Object cur;
-
-        /**
-         * @param part Partition.
-         * @param qry Query.
-         * @param qryMgr Query manager.
-         * @param cctx Cache context.
-         */
-        private ScanQueryFallbackClosableIterator(int part, 
GridCacheQueryAdapter qry,
-            GridCacheQueryManager qryMgr, GridCacheContext cctx) {
-            this.qry = qry;
-            this.qryMgr = qryMgr;
-            this.cctx = cctx;
-            this.part = part;
-
-            nodes = fallbacks(cctx.shared().exchange().readyAffinityVersion());
-
-            if (F.isEmpty(nodes))
-                throw new ClusterTopologyException("Failed to execute the 
query " +
-                    "(all affinity nodes left the grid) [cache=" + cctx.name() 
+
-                    ", qry=" + qry +
-                    ", curTopVer=" + 
qryMgr.queryTopologyVersion().topologyVersion() + ']');
-
-            init();
-        }
-
-        /**
-         * @param topVer Topology version.
-         * @return Nodes for query execution.
-         */
-        private Queue<ClusterNode> fallbacks(AffinityTopologyVersion topVer) {
-            Deque<ClusterNode> fallbacks = new LinkedList<>();
-            Collection<ClusterNode> owners = new HashSet<>();
-
-            for (ClusterNode node : cctx.topology().owners(part, topVer)) {
-                if (node.isLocal())
-                    fallbacks.addFirst(node);
-                else
-                    fallbacks.add(node);
-
-                owners.add(node);
-            }
-
-            for (ClusterNode node : cctx.topology().moving(part)) {
-                if (!owners.contains(node))
-                    fallbacks.add(node);
-            }
-
-            return fallbacks;
-        }
-
-        /**
-         *
-         */
-        @SuppressWarnings("unchecked")
-        private void init() {
-            final ClusterNode node = nodes.poll();
-
-            if (node.isLocal()) {
-                try {
-                    GridCloseableIterator it = qryMgr.scanQueryLocal(qry, 
true);
-
-                    tuple = new T2(it, null);
-                }
-                catch (IgniteClientDisconnectedCheckedException e) {
-                    throw CU.convertToCacheException(e);
-                }
-                catch (IgniteCheckedException e) {
-                    retryIfPossible(e);
-                }
-            }
-            else {
-                final GridCacheQueryBean bean = new GridCacheQueryBean(qry, 
null, qry.transform, null);
-
-                GridCacheQueryFutureAdapter fut =
-                    (GridCacheQueryFutureAdapter)qryMgr.queryDistributed(bean, 
Collections.singleton(node));
-
-                tuple = new T2(null, fut);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override protected Object onNext() throws IgniteCheckedException {
-            if (!onHasNext())
-                throw new NoSuchElementException();
-
-            assert cur != null;
-
-            Object e = cur;
-
-            cur = null;
-
-            return e;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected boolean onHasNext() throws IgniteCheckedException {
-            while (true) {
-                if (cur != null)
-                    return true;
-
-                T2<GridCloseableIterator<Object>, GridCacheQueryFutureAdapter> 
t = tuple;
-
-                GridCloseableIterator<Object> iter = t.get1();
-
-                if (iter != null) {
-                    boolean hasNext = iter.hasNext();
-
-                    if (hasNext)
-                        cur = iter.next();
-
-                    return hasNext;
-                }
-                else {
-                    GridCacheQueryFutureAdapter fut = t.get2();
-
-                    assert fut != null;
-
-                    if (firstItemReturned)
-                        return (cur = convert(fut.next())) != null;
-
-                    try {
-                        fut.awaitFirstItemAvailable();
-
-                        firstItemReturned = true;
-
-                        return (cur = convert(fut.next())) != null;
-                    }
-                    catch (IgniteClientDisconnectedCheckedException e) {
-                        throw CU.convertToCacheException(e);
-                    }
-                    catch (IgniteCheckedException e) {
-                        retryIfPossible(e);
-                    }
-                }
-            }
-        }
-
-        /**
-         * @param obj Entry to convert.
-         * @return Cache entry
-         */
-        private Object convert(Object obj) {
-            if (qry.transform() != null)
-                return obj;
-
-            Map.Entry e = (Map.Entry)obj;
-
-            return e == null ? null : new CacheQueryEntry(e.getKey(), 
e.getValue());
-        }
-
-        /**
-         * @param e Exception for query run.
-         */
-        private void retryIfPossible(IgniteCheckedException e) {
-            try {
-                IgniteInternalFuture<?> retryFut;
-
-                GridDhtUnreservedPartitionException partErr = X.cause(e, 
GridDhtUnreservedPartitionException.class);
-
-                if (partErr != null) {
-                    AffinityTopologyVersion waitVer = 
partErr.topologyVersion();
-
-                    assert waitVer != null;
-
-                    retryFut = 
cctx.shared().exchange().affinityReadyFuture(waitVer);
-                }
-                else if (e.hasCause(ClusterTopologyCheckedException.class)) {
-                    ClusterTopologyCheckedException topEx = X.cause(e, 
ClusterTopologyCheckedException.class);
-
-                    retryFut = topEx.retryReadyFuture();
-                }
-                else if (e.hasCause(ClusterGroupEmptyCheckedException.class)) {
-                    ClusterGroupEmptyCheckedException ex = X.cause(e, 
ClusterGroupEmptyCheckedException.class);
-
-                    retryFut = ex.retryReadyFuture();
-                }
-                else
-                    throw CU.convertToCacheException(e);
-
-                if (F.isEmpty(nodes)) {
-                    if (--unreservedNodesRetryCnt > 0) {
-                        if (retryFut != null)
-                            retryFut.get();
-
-                        nodes = fallbacks(unreservedTopVer == null ? 
cctx.shared().exchange().readyAffinityVersion() : unreservedTopVer);
-
-                        unreservedTopVer = null;
-
-                        init();
-                    }
-                    else
-                        throw CU.convertToCacheException(e);
-                }
-                else
-                    init();
-            }
-            catch (IgniteCheckedException ex) {
-                throw CU.convertToCacheException(ex);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void onClose() throws IgniteCheckedException {
-            super.onClose();
-
-            T2<GridCloseableIterator<Object>, GridCacheQueryFutureAdapter> t = 
tuple;
-
-            if (t != null && t.get1() != null)
-                t.get1().close();
-
-            if (t != null && t.get2() != null)
-                t.get2().cancel();
-        }
-    }
-}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java
index 4dcb8821f35..c32428ed026 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java
@@ -27,7 +27,7 @@ import org.jetbrains.annotations.Nullable;
  */
 public class GridCacheQueryBean {
     /** */
-    private final GridCacheQueryAdapter<?> qry;
+    private final CacheQuery<?> qry;
 
     /** */
     private final IgniteReducer<Object, Object> rdc;
@@ -44,8 +44,8 @@ public class GridCacheQueryBean {
      * @param trans Optional transformer.
      * @param args Optional arguments.
      */
-    public GridCacheQueryBean(GridCacheQueryAdapter<?> qry, @Nullable 
IgniteReducer<Object, Object> rdc,
-        @Nullable IgniteClosure<?, ?> trans, @Nullable Object[] args) {
+    public GridCacheQueryBean(CacheQuery<?> qry, @Nullable 
IgniteReducer<Object, Object> rdc,
+                              @Nullable IgniteClosure<?, ?> trans, @Nullable 
Object[] args) {
         assert qry != null;
 
         this.qry = qry;
@@ -57,7 +57,7 @@ public class GridCacheQueryBean {
     /**
      * @return Query.
      */
-    public GridCacheQueryAdapter<?> query() {
+    public CacheQuery<?> query() {
         return qry;
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java
index 0a108d580ae..ffac23c8a2e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java
@@ -37,7 +37,7 @@ class GridCacheQueryInfo {
     private IgniteReducer<Object, Object> rdc;
 
     /** */
-    private GridCacheQueryAdapter<?> qry;
+    private CacheQuery<?> qry;
 
     /** */
     private GridCacheLocalQueryFuture<?, ?, ?> locFut;
@@ -73,7 +73,7 @@ class GridCacheQueryInfo {
         boolean loc,
         IgniteClosure<?, ?> trans,
         IgniteReducer<Object, Object> rdc,
-        GridCacheQueryAdapter<?> qry,
+        CacheQuery<?> qry,
         GridCacheLocalQueryFuture<?, ?, ?> locFut,
         UUID sndId,
         long reqId,
@@ -110,7 +110,7 @@ class GridCacheQueryInfo {
     /**
      * @return Query.
      */
-    GridCacheQueryAdapter<?> query() {
+    CacheQuery<?> query() {
         return qry;
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 7dc28548194..9d2e65ac5b6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -503,7 +503,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
      * @return Iterator.
      * @throws IgniteCheckedException If failed.
      */
-    public abstract GridCloseableIterator 
scanQueryDistributed(GridCacheQueryAdapter qry,
+    public abstract GridCloseableIterator scanQueryDistributed(CacheQuery qry,
         Collection<ClusterNode> nodes) throws IgniteCheckedException;
 
     /**
@@ -541,7 +541,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
      * @throws IgniteCheckedException In case of error.
      */
     @SuppressWarnings("unchecked")
-    private QueryResult<K, V> executeQuery(GridCacheQueryAdapter<?> qry,
+    private QueryResult<K, V> executeQuery(CacheQuery<?> qry,
         IgniteClosure transformer, boolean loc, @Nullable String taskName, 
Object rcpt)
         throws IgniteCheckedException {
         if (qry.type() == null) {
@@ -662,7 +662,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
      * @return Collection of found keys.
      * @throws IgniteCheckedException In case of error.
      */
-    private FieldsResult executeFieldsQuery(GridCacheQueryAdapter<?> qry, 
@Nullable Object[] args,
+    private FieldsResult executeFieldsQuery(CacheQuery<?> qry, @Nullable 
Object[] args,
         boolean loc, @Nullable String taskName, Object rcpt) throws 
IgniteCheckedException {
         assert qry != null;
 
@@ -758,13 +758,13 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
      * @param qry Query.
      * @return Cache set items iterator.
      */
-    private GridCloseableIterator<IgniteBiTuple<K, V>> 
sharedCacheSetIterator(GridCacheQueryAdapter<?> qry)
+    private GridCloseableIterator<IgniteBiTuple<K, V>> 
sharedCacheSetIterator(CacheQuery<?> qry)
         throws IgniteCheckedException {
         final GridSetQueryPredicate filter = 
(GridSetQueryPredicate)qry.scanFilter();
 
         IgniteUuid id = filter.setId();
 
-        GridCacheQueryAdapter<CacheEntry<K, ?>> qry0 = new 
GridCacheQueryAdapter<>(cctx,
+        CacheQuery<CacheEntry<K, ?>> qry0 = new CacheQuery<>(cctx,
             SCAN,
             new IgniteBiPredicate<Object, Object>() {
                 @Override public boolean apply(Object k, Object v) {
@@ -792,7 +792,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
      * @throws IgniteCheckedException If failed to get iterator.
      */
     @SuppressWarnings({"unchecked"})
-    private GridCloseableIterator scanIterator(final GridCacheQueryAdapter<?> 
qry, IgniteClosure transformer,
+    private GridCloseableIterator scanIterator(final CacheQuery<?> qry, 
IgniteClosure transformer,
         boolean locNode)
         throws IgniteCheckedException {
         final InternalScanFilter<K, V> intFilter = 
internalFilter(qry.scanFilter());
@@ -954,7 +954,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
                 // Preparing query closures.
                 final IgniteReducer<Object, Object> rdc = 
injectResources((IgniteReducer<Object, Object>)qryInfo.reducer());
 
-                GridCacheQueryAdapter<?> qry = qryInfo.query();
+                CacheQuery<?> qry = qryInfo.query();
 
                 int pageSize = qry.pageSize();
 
@@ -1146,7 +1146,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
 
             boolean rmvIter = true;
 
-            GridCacheQueryAdapter<?> qry = qryInfo.query();
+            CacheQuery<?> qry = qryInfo.query();
 
             try {
                 IgniteReducer<Cache.Entry<K, V>, Object> rdc = 
injectResources((IgniteReducer<Cache.Entry<K, V>, Object>)qryInfo.reducer());
@@ -1385,7 +1385,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
      * @param updateStatistics Update statistics flag.
      */
     @SuppressWarnings({"unchecked"})
-    protected GridCloseableIterator scanQueryLocal(final GridCacheQueryAdapter 
qry,
+    protected GridCloseableIterator scanQueryLocal(final CacheQuery qry,
         boolean updateStatistics) throws IgniteCheckedException {
         if (!enterBusy())
             throw new IllegalStateException("Failed to process query request 
(grid is stopping).");
@@ -1454,7 +1454,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
      * @return GridCloseableIterator.
      */
     @SuppressWarnings({"unchecked"})
-    public GridCloseableIterator indexQueryLocal(final GridCacheQueryAdapter 
qry) throws IgniteCheckedException {
+    public GridCloseableIterator indexQueryLocal(final CacheQuery qry) throws 
IgniteCheckedException {
         if (!enterBusy())
             throw new IllegalStateException("Failed to process query request 
(grid is stopping).");
 
@@ -2071,7 +2071,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
      * @param qry Query.
      * @return Filter.
      */
-    private IndexingQueryFilter filter(GridCacheQueryAdapter<?> qry) {
+    private IndexingQueryFilter filter(CacheQuery<?> qry) {
         return filter(qry, null, false);
     }
 
@@ -2081,7 +2081,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
      * @param treatReplicatedAsPartitioned If true, only primary partitions of 
replicated caches will be used.
      * @return Filter.
      */
-    private IndexingQueryFilter filter(GridCacheQueryAdapter<?> qry, @Nullable 
int[] partsArr, boolean treatReplicatedAsPartitioned) {
+    private IndexingQueryFilter filter(CacheQuery<?> qry, @Nullable int[] 
partsArr, boolean treatReplicatedAsPartitioned) {
         if (qry.includeBackups())
             return null;
 
@@ -2879,7 +2879,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
      * @return Query.
      */
     public <R> CacheQuery<R> createSpiQuery(boolean keepBinary) {
-        return new GridCacheQueryAdapter<>(cctx,
+        return new CacheQuery<>(cctx,
             SPI,
             null,
             null,
@@ -2923,7 +2923,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
         boolean forceLocal,
         Boolean dataPageScanEnabled
     ) {
-        return new GridCacheQueryAdapter(cctx,
+        return new CacheQuery(cctx,
             SCAN,
             filter,
             trans,
@@ -2949,7 +2949,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
         A.notNull("clsName", clsName);
         A.notNull("search", search);
 
-        return new GridCacheQueryAdapter<Map.Entry<K, V>>(cctx,
+        return new CacheQuery<Map.Entry<K, V>>(cctx,
             TEXT,
             clsName,
             search,
@@ -2980,7 +2980,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
 
         IndexQueryDesc desc = new IndexQueryDesc(qry.getCriteria(), 
qry.getIndexName(), qry.getValueType());
 
-        GridCacheQueryAdapter q = new GridCacheQueryAdapter<>(
+        CacheQuery q = new CacheQuery<>(
             cctx, INDEX, desc, qry.getPartition(), qry.getValueType(), 
qry.getFilter());
 
         q.keepBinary(keepBinary);
@@ -3133,7 +3133,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
          */
         ScanQueryIterator(
             GridIterator<CacheDataRow> it,
-            GridCacheQueryAdapter qry,
+            CacheQuery qry,
             AffinityTopologyVersion topVer,
             GridDhtLocalPartition locPart,
             InternalScanFilter<K, V> intScanFilter,
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 13934c13aa1..3015d335a06 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -166,7 +166,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
     public static GridCacheQueryRequest startQueryRequest(GridCacheContext<?, 
?> cctx, long reqId,
         GridCacheDistributedQueryFuture<?, ?, ?> fut) {
         GridCacheQueryBean bean = fut.query();
-        GridCacheQueryAdapter<?> qry = bean.query();
+        CacheQuery<?> qry = bean.query();
 
         boolean deployFilterOrTransformer = (qry.scanFilter() != null || 
qry.transform() != null)
             && cctx.gridDeploy().enabled();
@@ -203,7 +203,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
      * @param reqId Request (cache query) ID.
      */
     public static GridCacheQueryRequest pageRequest(GridCacheContext<?, ?> 
cctx, long reqId,
-        GridCacheQueryAdapter<?> qry, boolean fields) {
+        CacheQuery<?> qry, boolean fields) {
 
         return new GridCacheQueryRequest(
             cctx.cacheId(),
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index 05ac4b38826..23a9bdea145 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -44,7 +44,6 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.query.CacheQuery;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
-import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -165,7 +164,7 @@ public class GridCacheSetImpl<T> extends 
AbstractCollection<T> implements Ignite
                 return cache.sizeAsync(new CachePeekMode[] {}).get() - 1;
             }
 
-            CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET,
+            CacheQuery qry = new CacheQuery<>(ctx, SET,
                 new GridSetQueryPredicate<>(id, collocated), null, collocated 
? hdrPart : null,
                 false, false, null);
 
@@ -440,7 +439,7 @@ public class GridCacheSetImpl<T> extends 
AbstractCollection<T> implements Ignite
      */
     @SuppressWarnings("unchecked")
     private WeakReferenceCloseableIterator<T> sharedCacheIterator(boolean 
keepBinary) throws IgniteCheckedException {
-        CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET,
+        CacheQuery qry = new CacheQuery<>(ctx, SET,
             new GridSetQueryPredicate<>(id, collocated), null, collocated ? 
hdrPart : null,
             keepBinary, false, null);
 
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties 
b/modules/core/src/main/resources/META-INF/classnames.properties
index 63d8fd49495..24dba9ea13e 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1208,8 +1208,8 @@ 
org.apache.ignite.internal.processors.cache.query.CacheQueryType
 
org.apache.ignite.internal.processors.cache.query.GridCacheDistributedQueryManager$1
 
org.apache.ignite.internal.processors.cache.query.GridCacheDistributedQueryManager$3
 
org.apache.ignite.internal.processors.cache.query.GridCacheDistributedQueryManager$4
-org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter$1
-org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter$ScanQueryFallbackClosableIterator
+org.apache.ignite.internal.processors.cache.query.CacheQuery$1
+org.apache.ignite.internal.processors.cache.query.CacheQuery$ScanQueryFallbackClosableIterator
 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryDetailMetricsAdapter
 org.apache.ignite.internal.processors.cache.query.GridCacheQueryFutureAdapter$1
 org.apache.ignite.internal.processors.cache.query.GridCacheQueryFutureAdapter$2

Reply via email to