This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new de08e0c40a3 IGNITE-20006 SQL Calcite: Move filtering/transforming from 
scan iterators to ScanNode - Fixes #10852.
de08e0c40a3 is described below

commit de08e0c40a32f28d15ead81c33e3e41d1443b5a0
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Wed Aug 2 09:29:23 2023 +0300

    IGNITE-20006 SQL Calcite: Move filtering/transforming from scan iterators 
to ScanNode - Fixes #10852.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../query/calcite/exec/AbstractIndexScan.java      | 25 +------
 .../query/calcite/exec/IndexFirstLastScan.java     |  3 +-
 .../processors/query/calcite/exec/IndexScan.java   | 15 +---
 .../query/calcite/exec/LogicalRelImplementor.java  | 33 ++++-----
 .../query/calcite/exec/RuntimeHashIndex.java       | 24 ++-----
 .../query/calcite/exec/RuntimeSortedIndex.java     |  8 +--
 .../query/calcite/exec/SystemViewScan.java         | 28 +-------
 .../processors/query/calcite/exec/TableScan.java   | 21 +-----
 .../query/calcite/exec/rel/IndexSpoolNode.java     | 13 ++--
 .../query/calcite/exec/rel/ScanNode.java           | 82 ++++++++++++++++++----
 .../query/calcite/exec/rel/ScanStorageNode.java    | 67 ++++++++++++++++++
 .../query/calcite/schema/CacheIndexImpl.java       | 12 ++--
 .../query/calcite/schema/CacheTableImpl.java       | 17 ++---
 .../query/calcite/schema/IgniteIndex.java          |  4 --
 .../query/calcite/schema/IgniteTable.java          | 10 +--
 .../query/calcite/schema/SystemViewIndexImpl.java  |  6 --
 .../query/calcite/schema/SystemViewTableImpl.java  |  6 +-
 .../calcite/exec/LogicalRelImplementorTest.java    | 35 +++++----
 .../integration/AbstractBasicIntegrationTest.java  |  6 +-
 .../integration/IndexScanlIntegrationTest.java     | 27 ++++---
 .../KillQueryCommandDdlIntegrationTest.java        |  4 --
 .../integration/RunningQueriesIntegrationTest.java |  6 --
 .../integration/SqlDiagnosticIntegrationTest.java  |  5 ++
 .../integration/TimeoutIntegrationTest.java        | 11 ++-
 .../query/calcite/planner/PlannerTest.java         | 25 ++-----
 .../query/calcite/planner/TestTable.java           |  5 +-
 .../query/running/RunningQueryManager.java         | 58 +++++++--------
 27 files changed, 257 insertions(+), 299 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractIndexScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractIndexScan.java
index a180fe7c81d..6e87a7b3e6f 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractIndexScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractIndexScan.java
@@ -19,8 +19,6 @@ package 
org.apache.ignite.internal.processors.query.calcite.exec;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.function.Function;
-import java.util.function.Predicate;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.IgniteCheckedException;
 import 
org.apache.ignite.internal.cache.query.index.sorted.inline.IndexQueryContext;
@@ -39,15 +37,9 @@ public abstract class AbstractIndexScan<Row, IdxRow> 
implements Iterable<Row>, A
     /** */
     private final TreeIndex<IdxRow> idx;
 
-    /** Additional filters. */
-    private final Predicate<Row> filters;
-
     /** Index scan bounds. */
     private final RangeIterable<Row> ranges;
 
-    /** */
-    private final Function<Row, Row> rowTransformer;
-
     /** */
     protected final ExecutionContext<Row> ectx;
 
@@ -57,23 +49,18 @@ public abstract class AbstractIndexScan<Row, IdxRow> 
implements Iterable<Row>, A
     /**
      * @param ectx Execution context.
      * @param idx Physical index.
-     * @param filters Additional filters.
      * @param ranges Index scan bounds.
      */
     protected AbstractIndexScan(
         ExecutionContext<Row> ectx,
         RelDataType rowType,
         TreeIndex<IdxRow> idx,
-        Predicate<Row> filters,
-        RangeIterable<Row> ranges,
-        Function<Row, Row> rowTransformer
+        RangeIterable<Row> ranges
     ) {
         this.ectx = ectx;
         this.rowType = rowType;
         this.idx = idx;
-        this.filters = filters;
         this.ranges = ranges;
-        this.rowTransformer = rowTransformer;
     }
 
     /** {@inheritDoc} */
@@ -164,15 +151,7 @@ public abstract class AbstractIndexScan<Row, IdxRow> 
implements Iterable<Row>, A
             while (next == null && cursor.next()) {
                 IdxRow idxRow = cursor.get();
 
-                Row r = indexRow2Row(idxRow);
-
-                if (filters != null && !filters.test(r))
-                    continue;
-
-                if (rowTransformer != null)
-                    r = rowTransformer.apply(r);
-
-                next = r;
+                next = indexRow2Row(idxRow);
             }
         }
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexFirstLastScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexFirstLastScan.java
index dd316442cf6..cc1b8d70a23 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexFirstLastScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexFirstLastScan.java
@@ -49,8 +49,7 @@ public class IndexFirstLastScan<Row> extends IndexScan<Row> {
         int[] parts,
         @Nullable ImmutableBitSet requiredColumns
     ) {
-        super(ectx, desc, new FirstLastIndexWrapper(idx, first), 
idxFieldMapping, parts, null, null, null,
-            requiredColumns);
+        super(ectx, desc, new FirstLastIndexWrapper(idx, first), 
idxFieldMapping, parts, null, requiredColumns);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
index a8fd6c4ab89..af8aaf431e7 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
@@ -21,8 +21,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.function.Function;
-import java.util.function.Predicate;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
@@ -109,7 +107,6 @@ public class IndexScan<Row> extends AbstractIndexScan<Row, 
IndexRow> {
      * @param desc Table descriptor.
      * @param idxFieldMapping Mapping from index keys to row fields.
      * @param idx Physical index.
-     * @param filters Additional filters.
      * @param ranges Index scan bounds.
      */
     public IndexScan(
@@ -118,13 +115,10 @@ public class IndexScan<Row> extends 
AbstractIndexScan<Row, IndexRow> {
         InlineIndex idx,
         ImmutableIntList idxFieldMapping,
         int[] parts,
-        Predicate<Row> filters,
         RangeIterable<Row> ranges,
-        Function<Row, Row> rowTransformer,
         @Nullable ImmutableBitSet requiredColumns
     ) {
-        this(ectx, desc, new TreeIndexWrapper(idx), idxFieldMapping, parts, 
filters, ranges, rowTransformer,
-            requiredColumns);
+        this(ectx, desc, new TreeIndexWrapper(idx), idxFieldMapping, parts, 
ranges, requiredColumns);
     }
 
     /**
@@ -132,7 +126,6 @@ public class IndexScan<Row> extends AbstractIndexScan<Row, 
IndexRow> {
      * @param desc Table descriptor.
      * @param idxFieldMapping Mapping from index keys to row fields.
      * @param treeIdx Physical index wrapper.
-     * @param filters Additional filters.
      * @param ranges Index scan bounds.
      */
     protected IndexScan(
@@ -141,18 +134,14 @@ public class IndexScan<Row> extends 
AbstractIndexScan<Row, IndexRow> {
         TreeIndexWrapper treeIdx,
         ImmutableIntList idxFieldMapping,
         int[] parts,
-        Predicate<Row> filters,
         RangeIterable<Row> ranges,
-        Function<Row, Row> rowTransformer,
         @Nullable ImmutableBitSet requiredColumns
     ) {
         super(
             ectx,
             desc.rowType(ectx.getTypeFactory(), requiredColumns),
             treeIdx,
-            filters,
-            ranges,
-            rowTransformer
+            ranges
         );
 
         this.desc = desc;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 07dc8c645f1..3300f01b58e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -60,6 +60,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.ProjectNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanStorageNode;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.SortAggregateNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortNode;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.TableSpoolNode;
@@ -312,9 +313,9 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
         IgniteIndex idx = tbl.getIndex(rel.indexName());
 
         if (idx != null && !tbl.isIndexRebuildInProgress()) {
-            Iterable<Row> rowsIter = idx.scan(ctx, grp, filters, ranges, prj, 
requiredColumns);
+            Iterable<Row> rowsIter = idx.scan(ctx, grp, ranges, 
requiredColumns);
 
-            return new ScanNode<>(ctx, rowType, rowsIter);
+            return new ScanStorageNode<>(ctx, rowType, rowsIter, filters, prj);
         }
         else {
             // Index was invalidated after planning, workaround through 
table-scan -> sort -> index spool.
@@ -331,8 +332,6 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
             Iterable<Row> rowsIter = tbl.scan(
                 ctx,
                 grp,
-                filterHasCorrelation ? null : filters,
-                projNodeRequired ? null : prj,
                 requiredColumns
             );
 
@@ -340,7 +339,8 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
             if (!spoolNodeRequired && projects != null)
                 rowType = rel.getRowType();
 
-            Node<Row> node = new ScanNode<>(ctx, rowType, rowsIter);
+            Node<Row> node = new ScanStorageNode<>(ctx, rowType, rowsIter, 
filterHasCorrelation ? null : filters,
+                projNodeRequired ? null : prj);
 
             RelCollation collation = rel.collation();
 
@@ -406,15 +406,15 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
         IgniteIndex idx = tbl.getIndex(rel.indexName());
 
         if (idx != null && !tbl.isIndexRebuildInProgress()) {
-            return new ScanNode<>(ctx, rel.getRowType(), () -> 
Collections.singletonList(ctx.rowHandler()
+            return new ScanStorageNode<>(ctx, rel.getRowType(), () -> 
Collections.singletonList(ctx.rowHandler()
                 .factory(ctx.getTypeFactory(), rel.getRowType())
                 .create(idx.count(ctx, ctx.group(rel.sourceId()), 
rel.notNull()))).iterator());
         }
         else {
             CollectNode<Row> replacement = 
CollectNode.createCountCollector(ctx);
 
-            replacement.register(new ScanNode<>(ctx, 
rel.getTable().getRowType(), tbl.scan(ctx,
-                ctx.group(rel.sourceId()), null, null, 
ImmutableBitSet.of(0))));
+            replacement.register(new ScanStorageNode<>(ctx, 
rel.getTable().getRowType(), tbl.scan(ctx,
+                ctx.group(rel.sourceId()), ImmutableBitSet.of(0))));
 
             return replacement;
         }
@@ -430,19 +430,14 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
         RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);
 
         if (idx != null && !tbl.isIndexRebuildInProgress())
-            return new ScanNode<>(ctx, rowType, 
idx.firstOrLast(idxBndRel.first(), ctx, grp, requiredColumns));
+            return new ScanStorageNode<>(ctx, rowType, 
idx.firstOrLast(idxBndRel.first(), ctx, grp, requiredColumns));
         else {
             assert requiredColumns.cardinality() == 1;
 
-            Iterable<Row> rowsIter = tbl.scan(
-                ctx,
-                grp,
-                r -> ctx.rowHandler().get(0, r) != null,
-                null,
-                idxBndRel.requiredColumns()
-            );
+            Iterable<Row> rowsIter = tbl.scan(ctx, grp, 
idxBndRel.requiredColumns());
 
-            Node<Row> scanNode = new ScanNode<>(ctx, rowType, rowsIter);
+            Node<Row> scanNode = new ScanStorageNode<>(ctx, rowType, rowsIter,
+                r -> ctx.rowHandler().get(0, r) != null, null);
 
             RelCollation collation = 
idx.collation().apply(LogicalScanConverterRule.createMapping(
                 null,
@@ -484,9 +479,9 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
 
         ColocationGroup group = ctx.group(rel.sourceId());
 
-        Iterable<Row> rowsIter = tbl.scan(ctx, group, filters, prj, 
requiredColunms);
+        Iterable<Row> rowsIter = tbl.scan(ctx, group, requiredColunms);
 
-        return new ScanNode<>(ctx, rowType, rowsIter);
+        return new ScanStorageNode<>(ctx, rowType, rowsIter, filters, prj);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java
index fddf43b7126..f0163e6d91e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java
@@ -21,16 +21,12 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.function.Predicate;
 import java.util.function.Supplier;
-
 import org.apache.calcite.util.ImmutableBitSet;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
-import org.apache.ignite.internal.util.lang.GridFilteredIterator;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Runtime hash index based on on-heap hash map.
@@ -89,8 +85,8 @@ public class RuntimeHashIndex<Row> implements 
RuntimeIndex<Row> {
     }
 
     /** */
-    public Iterable<Row> scan(Supplier<Row> searchRow, @Nullable 
Predicate<Row> filter) {
-        return new IndexScan(searchRow, filter);
+    public Iterable<Row> scan(Supplier<Row> searchRow) {
+        return new IndexScan(searchRow);
     }
 
     /** */
@@ -116,16 +112,11 @@ public class RuntimeHashIndex<Row> implements 
RuntimeIndex<Row> {
         /** Search row. */
         private final Supplier<Row> searchRow;
 
-        /** Row filter. */
-        private final Predicate<Row> filter;
-
         /**
          * @param searchRow Search row.
-         * @param filter Scan condition.
          */
-        IndexScan(Supplier<Row> searchRow, @Nullable Predicate<Row> filter) {
+        IndexScan(Supplier<Row> searchRow) {
             this.searchRow = searchRow;
-            this.filter = filter;
         }
 
         /** {@inheritDoc} */
@@ -142,14 +133,7 @@ public class RuntimeHashIndex<Row> implements 
RuntimeIndex<Row> {
 
             List<Row> eqRows = rows.get(key);
 
-            if (eqRows == null)
-                return Collections.emptyIterator();
-
-            return filter == null ? eqRows.iterator() : new 
GridFilteredIterator<Row>(eqRows.iterator()) {
-                @Override protected boolean accept(Row row) {
-                    return filter.test(row);
-                }
-            };
+            return eqRows == null ? Collections.emptyIterator() : 
eqRows.iterator();
         }
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndex.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndex.java
index e58d17f2759..ecb92c451c9 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndex.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndex.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
-import java.util.function.Predicate;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.type.RelDataType;
 import 
org.apache.ignite.internal.cache.query.index.sorted.inline.IndexQueryContext;
@@ -100,10 +99,9 @@ public class RuntimeSortedIndex<Row> implements 
RuntimeIndex<Row>, TreeIndex<Row
     public Iterable<Row> scan(
         ExecutionContext<Row> ectx,
         RelDataType rowType,
-        Predicate<Row> filter,
         RangeIterable<Row> ranges
     ) {
-        return new IndexScan(rowType, this, filter, ranges);
+        return new IndexScan(rowType, this, ranges);
     }
 
     /**
@@ -190,16 +188,14 @@ public class RuntimeSortedIndex<Row> implements 
RuntimeIndex<Row>, TreeIndex<Row
         /**
          * @param rowType Row type.
          * @param idx Physical index.
-         * @param filter Additional filters.
          * @param ranges Index scan bounds.
          */
         IndexScan(
             RelDataType rowType,
             TreeIndex<Row> idx,
-            Predicate<Row> filter,
             RangeIterable<Row> ranges
         ) {
-            super(RuntimeSortedIndex.this.ectx, rowType, idx, filter, ranges, 
null);
+            super(RuntimeSortedIndex.this.ectx, rowType, idx, ranges);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SystemViewScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SystemViewScan.java
index 2fa944ae678..76f71f50837 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SystemViewScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SystemViewScan.java
@@ -20,8 +20,6 @@ package 
org.apache.ignite.internal.processors.query.calcite.exec;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.function.Function;
-import java.util.function.Predicate;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.ImmutableBitSet;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
@@ -31,8 +29,6 @@ import 
org.apache.ignite.internal.processors.query.calcite.schema.SystemViewColu
 import 
org.apache.ignite.internal.processors.query.calcite.schema.SystemViewTableDescriptorImpl;
 import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.systemview.view.FiltrableSystemView;
 import org.apache.ignite.spi.systemview.view.SystemView;
 import org.jetbrains.annotations.Nullable;
@@ -51,12 +47,6 @@ public class SystemViewScan<Row, ViewRow> implements 
Iterable<Row> {
     /** */
     private final RangeIterable<Row> ranges;
 
-    /** */
-    private final Predicate<Row> filters;
-
-    /** */
-    private final Function<Row, Row> rowTransformer;
-
     /** Participating colunms. */
     private final ImmutableBitSet requiredColumns;
 
@@ -71,15 +61,11 @@ public class SystemViewScan<Row, ViewRow> implements 
Iterable<Row> {
         ExecutionContext<Row> ectx,
         SystemViewTableDescriptorImpl<ViewRow> desc,
         @Nullable RangeIterable<Row> ranges,
-        Predicate<Row> filters,
-        Function<Row, Row> rowTransformer,
         @Nullable ImmutableBitSet requiredColumns
     ) {
         this.ectx = ectx;
         this.desc = desc;
         this.ranges = ranges;
-        this.filters = filters;
-        this.rowTransformer = rowTransformer;
         this.requiredColumns = requiredColumns;
 
         RelDataType rowType = desc.rowType(ectx.getTypeFactory(), 
requiredColumns);
@@ -137,18 +123,6 @@ public class SystemViewScan<Row, ViewRow> implements 
Iterable<Row> {
         else
             viewIter = view.iterator();
 
-        Iterator<Row> iter = F.iterator(
-            viewIter,
-            row -> desc.toRow(ectx, row, factory, requiredColumns),
-            true);
-
-        if (rowTransformer != null || filters != null) {
-            IgniteClosure<Row, Row> trans = rowTransformer == null ? 
F.identity() : rowTransformer::apply;
-            IgnitePredicate<Row> filter = filters == null ? F.alwaysTrue() : 
filters::test;
-
-            iter = F.iterator(iter, trans, true, filter);
-        }
-
-        return iter;
+        return F.iterator(viewIter, row -> desc.toRow(ectx, row, factory, 
requiredColumns), true);
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
index b4637e6cb44..4e1d15fea35 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
@@ -24,8 +24,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Queue;
-import java.util.function.Function;
-import java.util.function.Predicate;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.IgniteCheckedException;
@@ -51,9 +49,6 @@ public class TableScan<Row> implements Iterable<Row>, 
AutoCloseable {
     /** */
     private final GridCacheContext<?, ?> cctx;
 
-    /** */
-    private final Predicate<Row> filters;
-
     /** */
     private final ExecutionContext<Row> ectx;
 
@@ -75,9 +70,6 @@ public class TableScan<Row> implements Iterable<Row>, 
AutoCloseable {
     /** */
     private volatile List<GridDhtLocalPartition> reserved;
 
-    /** */
-    private final Function<Row, Row> rowTransformer;
-
     /** Participating colunms. */
     private final ImmutableBitSet requiredColunms;
 
@@ -86,16 +78,12 @@ public class TableScan<Row> implements Iterable<Row>, 
AutoCloseable {
         ExecutionContext<Row> ectx,
         CacheTableDescriptor desc,
         int[] parts,
-        Predicate<Row> filters,
-        Function<Row, Row> rowTransformer,
         @Nullable ImmutableBitSet requiredColunms
     ) {
         this.ectx = ectx;
         cctx = desc.cacheContext();
         this.desc = desc;
         this.parts = parts;
-        this.filters = filters;
-        this.rowTransformer = rowTransformer;
         this.requiredColunms = requiredColunms;
 
         RelDataType rowType = desc.rowType(this.ectx.getTypeFactory(), 
requiredColunms);
@@ -265,15 +253,8 @@ public class TableScan<Row> implements Iterable<Row>, 
AutoCloseable {
                     if (!desc.match(row))
                         continue;
 
-                    Row r = desc.toRow(ectx, row, factory, requiredColunms);
-
-                    if (filters != null && !filters.test(r))
-                        continue;
-
-                    if (rowTransformer != null)
-                        r = rowTransformer.apply(r);
+                    next = desc.toRow(ectx, row, factory, requiredColunms);
 
-                    next = r;
                     break;
                 }
                 else
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java
index d159441d8fe..998792f0f36 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java
@@ -174,12 +174,9 @@ public class IndexSpoolNode<Row> extends 
MemoryTrackingNode<Row> implements Sing
         ScanNode<Row> scan = new ScanNode<>(
             ctx,
             rowType,
-            idx.scan(
-                ctx,
-                rowType,
-                filter,
-                ranges
-            )
+            idx.scan(ctx, rowType, ranges),
+            filter,
+            null
         );
 
         return new IndexSpoolNode<>(ctx, rowType, idx, scan);
@@ -199,7 +196,9 @@ public class IndexSpoolNode<Row> extends 
MemoryTrackingNode<Row> implements Sing
         ScanNode<Row> scan = new ScanNode<>(
             ctx,
             rowType,
-            idx.scan(searchRow, filter)
+            idx.scan(searchRow),
+            filter,
+            null
         );
 
         return new IndexSpoolNode<>(ctx, rowType, idx, scan);
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
index 62838096c8f..a59badc7160 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
@@ -19,9 +19,12 @@ package 
org.apache.ignite.internal.processors.query.calcite.exec.rel;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.function.Function;
+import java.util.function.Predicate;
 import org.apache.calcite.rel.type.RelDataType;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Scan node.
@@ -30,6 +33,12 @@ public class ScanNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>
     /** */
     private final Iterable<Row> src;
 
+    /** */
+    @Nullable private final Predicate<Row> filter;
+
+    /** */
+    @Nullable private final Function<Row, Row> rowTransformer;
+
     /** */
     private Iterator<Row> it;
 
@@ -44,12 +53,32 @@ public class ScanNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>
 
     /**
      * @param ctx Execution context.
+     * @param rowType Row type.
      * @param src Source.
      */
     public ScanNode(ExecutionContext<Row> ctx, RelDataType rowType, 
Iterable<Row> src) {
+        this(ctx, rowType, src, null, null);
+    }
+
+    /**
+     * @param ctx Execution context.
+     * @param rowType Row type.
+     * @param src Source.
+     * @param filter Row filter.
+     * @param rowTransformer Row transformer (projection).
+     */
+    public ScanNode(
+        ExecutionContext<Row> ctx,
+        RelDataType rowType,
+        Iterable<Row> src,
+        @Nullable Predicate<Row> filter,
+        @Nullable Function<Row, Row> rowTransformer
+    ) {
         super(ctx, rowType);
 
         this.src = src;
+        this.filter = filter;
+        this.rowTransformer = rowTransformer;
     }
 
     /** {@inheritDoc} */
@@ -110,30 +139,41 @@ public class ScanNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>
 
         inLoop = true;
         try {
-            context().ioTracker().startTracking();
-
             if (it == null)
                 it = src.iterator();
 
-            int processed = 0;
-            while (requested > 0 && it.hasNext()) {
-                checkState();
+            processNextBatch();
+        }
+        finally {
+            inLoop = false;
+        }
+    }
+
+    /**
+     * @return Count of processed rows.
+     */
+    protected int processNextBatch() throws Exception {
+        int processed = 0;
+        while (requested > 0 && it.hasNext()) {
+            checkState();
+
+            Row r = it.next();
 
+            if (filter == null || filter.test(r)) {
                 requested--;
-                downstream().push(it.next());
 
-                if (++processed == IN_BUFFER_SIZE && requested > 0) {
-                    // allow others to do their job
-                    context().execute(this::push, this::onError);
+                if (rowTransformer != null)
+                    r = rowTransformer.apply(r);
 
-                    return;
-                }
+                downstream().push(r);
             }
-        }
-        finally {
-            context().ioTracker().stopTracking();
 
-            inLoop = false;
+            if (++processed == IN_BUFFER_SIZE && requested > 0) {
+                // Allow others to do their job.
+                context().execute(this::push, this::onError);
+
+                return processed;
+            }
         }
 
         if (requested > 0 && !it.hasNext()) {
@@ -144,5 +184,17 @@ public class ScanNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>
 
             downstream().end();
         }
+
+        return processed;
+    }
+
+    /** */
+    @Nullable public Predicate<Row> filter() {
+        return filter;
+    }
+
+    /** */
+    @Nullable public Function<Row, Row> rowTransformer() {
+        return rowTransformer;
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanStorageNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanStorageNode.java
new file mode 100644
index 00000000000..4d126208431
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanStorageNode.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.calcite.rel.type.RelDataType;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Scan storage node.
+ */
+public class ScanStorageNode<Row> extends ScanNode<Row> {
+    /**
+     * @param ctx Execution context.
+     * @param rowType Row type.
+     * @param src Source.
+     * @param filter Row filter.
+     * @param rowTransformer Row transformer (projection).
+     */
+    public ScanStorageNode(
+        ExecutionContext<Row> ctx,
+        RelDataType rowType,
+        Iterable<Row> src,
+        @Nullable Predicate<Row> filter,
+        @Nullable Function<Row, Row> rowTransformer
+    ) {
+        super(ctx, rowType, src, filter, rowTransformer);
+    }
+
+    /**
+     * @param ctx Execution context.
+     * @param rowType Row type.
+     * @param src Source.
+     */
+    public ScanStorageNode(ExecutionContext<Row> ctx, RelDataType rowType, 
Iterable<Row> src) {
+        super(ctx, rowType, src);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int processNextBatch() throws Exception {
+        try {
+            context().ioTracker().startTracking();
+
+            return super.processNextBatch();
+        }
+        finally {
+            context().ioTracker().stopTracking();
+        }
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java
index 9fd8a5553b0..a6162af69dd 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java
@@ -21,8 +21,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
-import java.util.function.Predicate;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.RelCollation;
@@ -116,16 +114,14 @@ public class CacheIndexImpl implements IgniteIndex {
     /** */
     @Override public <Row> Iterable<Row> scan(
         ExecutionContext<Row> execCtx,
-        ColocationGroup group,
-        Predicate<Row> filters,
+        ColocationGroup grp,
         RangeIterable<Row> ranges,
-        Function<Row, Row> rowTransformer,
         @Nullable ImmutableBitSet requiredColumns
     ) {
-        UUID localNodeId = execCtx.localNodeId();
-        if (group.nodeIds().contains(localNodeId) && idx != null) {
+        UUID locNodeId = execCtx.localNodeId();
+        if (grp.nodeIds().contains(locNodeId) && idx != null) {
             return new IndexScan<>(execCtx, tbl.descriptor(), 
idx.unwrap(InlineIndex.class), collation.getKeys(),
-                group.partitions(localNodeId), filters, ranges, 
rowTransformer, requiredColumns);
+                grp.partitions(locNodeId), ranges, requiredColumns);
         }
 
         return Collections.emptyList();
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java
index 0487725cfa1..8e2b80390bb 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java
@@ -22,8 +22,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
-import java.util.function.Predicate;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.type.RelDataType;
@@ -109,14 +107,13 @@ public class CacheTableImpl extends AbstractTable 
implements IgniteCacheTable {
     /** {@inheritDoc} */
     @Override public <Row> Iterable<Row> scan(
         ExecutionContext<Row> execCtx,
-        ColocationGroup group,
-        Predicate<Row> filter,
-        Function<Row, Row> rowTransformer,
-        @Nullable ImmutableBitSet usedColumns) {
-        UUID localNodeId = execCtx.localNodeId();
-
-        if (group.nodeIds().contains(localNodeId))
-            return new TableScan<>(execCtx, desc, 
group.partitions(localNodeId), filter, rowTransformer, usedColumns);
+        ColocationGroup grp,
+        @Nullable ImmutableBitSet usedColumns
+    ) {
+        UUID locNodeId = execCtx.localNodeId();
+
+        if (grp.nodeIds().contains(locNodeId))
+            return new TableScan<>(execCtx, desc, grp.partitions(locNodeId), 
usedColumns);
 
         return Collections.emptyList();
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java
index 81ded7289bc..618a78920a2 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java
@@ -17,8 +17,6 @@
 package org.apache.ignite.internal.processors.query.calcite.schema;
 
 import java.util.List;
-import java.util.function.Function;
-import java.util.function.Predicate;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.RelCollation;
@@ -80,9 +78,7 @@ public interface IgniteIndex {
     public <Row> Iterable<Row> scan(
         ExecutionContext<Row> execCtx,
         ColocationGroup grp,
-        Predicate<Row> filters,
         RangeIterable<Row> ranges,
-        Function<Row, Row> rowTransformer,
         @Nullable ImmutableBitSet requiredColumns
     );
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index 1d12c8c687f..1c69a60ed39 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -18,8 +18,6 @@ package 
org.apache.ignite.internal.processors.query.calcite.schema;
 
 import java.util.List;
 import java.util.Map;
-import java.util.function.Function;
-import java.util.function.Predicate;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.core.TableScan;
@@ -84,17 +82,13 @@ public interface IgniteTable extends TranslatableTable {
      * Creates rows iterator over the table.
      *
      * @param execCtx Execution context.
-     * @param group Colocation group.
-     * @param filter Row filter.
-     * @param rowTransformer Row transformer.
+     * @param grp Colocation group.
      * @param usedColumns Used columns enumeration.
      * @return Rows iterator.
      */
     public <Row> Iterable<Row> scan(
         ExecutionContext<Row> execCtx,
-        ColocationGroup group,
-        Predicate<Row> filter,
-        Function<Row, Row> rowTransformer,
+        ColocationGroup grp,
         @Nullable ImmutableBitSet usedColumns);
 
     /**
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewIndexImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewIndexImpl.java
index 85717a5e9ad..d5eca929902 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewIndexImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewIndexImpl.java
@@ -17,8 +17,6 @@
 package org.apache.ignite.internal.processors.query.calcite.schema;
 
 import java.util.List;
-import java.util.function.Function;
-import java.util.function.Predicate;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.RelCollation;
@@ -82,17 +80,13 @@ public class SystemViewIndexImpl implements IgniteIndex {
     @Override public <Row> Iterable<Row> scan(
         ExecutionContext<Row> execCtx,
         ColocationGroup grp,
-        Predicate<Row> filters,
         RangeIterable<Row> ranges,
-        Function<Row, Row> rowTransformer,
         @Nullable ImmutableBitSet requiredColumns
     ) {
         return new SystemViewScan<>(
             execCtx,
             tbl.descriptor(),
             ranges,
-            filters,
-            rowTransformer,
             requiredColumns
         );
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java
index fee4ca5f2c2..44394991a7f 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java
@@ -20,8 +20,6 @@ package 
org.apache.ignite.internal.processors.query.calcite.schema;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Function;
-import java.util.function.Predicate;
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
@@ -103,11 +101,9 @@ public class SystemViewTableImpl extends AbstractTable 
implements IgniteTable {
     @Override public <Row> Iterable<Row> scan(
         ExecutionContext<Row> execCtx,
         ColocationGroup grp,
-        Predicate<Row> filter,
-        Function<Row, Row> rowTransformer,
         @Nullable ImmutableBitSet usedColumns
     ) {
-        return new SystemViewScan<>(execCtx, desc, null, filter, 
rowTransformer, usedColumns);
+        return new SystemViewScan<>(execCtx, desc, null, usedColumns);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java
index e5651d92e0b..8105a9da3fd 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
-import java.util.function.Function;
 import java.util.function.Predicate;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.rel.RelCollation;
@@ -77,7 +76,7 @@ public class LogicalRelImplementorTest extends 
GridCommonAbstractTest {
     private RelOptCluster cluster;
 
     /** */
-    private ScanAwareTable tbl;
+    private TestTable tbl;
 
     /** */
     private BaseQueryContext qctx;
@@ -105,7 +104,7 @@ public class LogicalRelImplementorTest extends 
GridCommonAbstractTest {
 
         RelDataType rowType = b.build();
 
-        tbl = new ScanAwareTable(rowType);
+        tbl = new ScannableTestTable(rowType);
 
         IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
         publicSchema.addTable("TBL", tbl);
@@ -284,13 +283,13 @@ public class LogicalRelImplementorTest extends 
GridCommonAbstractTest {
         tbl.markIndexRebuildInProgress(true);
 
         Predicate<Node<Object[]>> isScanNoFilterNoProject =
-            node -> node instanceof ScanNode && !tbl.lastScanHasFilter && 
!tbl.lastScanHasProject;
+            node -> node instanceof ScanNode && !hasFilter(node) && 
!hasProject(node);
         Predicate<Node<Object[]>> isScanWithFilterNoProject =
-            node -> node instanceof ScanNode && tbl.lastScanHasFilter && 
!tbl.lastScanHasProject;
+            node -> node instanceof ScanNode && hasFilter(node) && 
!hasProject(node);
         Predicate<Node<Object[]>> isScanWithProjectNoFilter =
-            node -> node instanceof ScanNode && !tbl.lastScanHasFilter && 
tbl.lastScanHasProject;
+            node -> node instanceof ScanNode && !hasFilter(node) && 
hasProject(node);
         Predicate<Node<Object[]>> isScanWithFilterWithProject =
-            node -> node instanceof ScanNode && tbl.lastScanHasFilter && 
tbl.lastScanHasProject;
+            node -> node instanceof ScanNode && hasFilter(node) && 
hasProject(node);
 
         Predicate<Node<Object[]>> isSort = node -> node instanceof SortNode;
         Predicate<Node<Object[]>> isSpool = node -> node instanceof 
IndexSpoolNode;
@@ -418,28 +417,28 @@ public class LogicalRelImplementorTest extends 
GridCommonAbstractTest {
     }
 
     /** */
-    private static class ScanAwareTable extends TestTable {
-        /** */
-        private volatile boolean lastScanHasFilter;
+    private boolean hasFilter(Node<?> node) {
+        return ((ScanNode<?>)node).filter() != null;
+    }
 
-        /** */
-        private volatile boolean lastScanHasProject;
+    /** */
+    private boolean hasProject(Node<?> node) {
+        return ((ScanNode<?>)node).rowTransformer() != null;
+    }
 
+    /** */
+    private static class ScannableTestTable extends TestTable {
         /** */
-        public ScanAwareTable(RelDataType rowType) {
-            super(rowType);
+        public ScannableTestTable(RelDataType type) {
+            super(type);
         }
 
         /** {@inheritDoc} */
         @Override public <Row> Iterable<Row> scan(
             ExecutionContext<Row> execCtx,
             ColocationGroup grp,
-            Predicate<Row> filter,
-            Function<Row, Row> transformer,
             ImmutableBitSet bitSet
         ) {
-            lastScanHasFilter = filter != null;
-            lastScanHasProject = transformer != null;
             return Collections.emptyList();
         }
     }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index 6348af14038..111416f0da8 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -18,8 +18,6 @@
 package org.apache.ignite.internal.processors.query.calcite.integration;
 
 import java.util.List;
-import java.util.function.Function;
-import java.util.function.Predicate;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.RelCollation;
@@ -258,12 +256,10 @@ public class AbstractBasicIntegrationTest extends 
GridCommonAbstractTest {
         @Override public <Row> Iterable<Row> scan(
             ExecutionContext<Row> execCtx,
             ColocationGroup grp,
-            Predicate<Row> filters,
             RangeIterable<Row> ranges,
-            Function<Row, Row> rowTransformer,
             @Nullable ImmutableBitSet requiredColumns
         ) {
-            return delegate.scan(execCtx, grp, filters, ranges, 
rowTransformer, requiredColumns);
+            return delegate.scan(execCtx, grp, ranges, requiredColumns);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
index ddd2e320b47..875e5e760a3 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
@@ -21,13 +21,12 @@ import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.Iterator;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
 import java.util.function.IntFunction;
-import java.util.function.Predicate;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
@@ -44,6 +43,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import 
org.apache.ignite.internal.processors.query.schema.management.SchemaManager;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
@@ -372,25 +372,22 @@ public class IndexScanlIntegrationTest extends 
AbstractBasicIntegrationTest {
         @Override public <Row> Iterable<Row> scan(
             ExecutionContext<Row> execCtx,
             ColocationGroup grp,
-            Predicate<Row> filters,
             RangeIterable<Row> ranges,
-            Function<Row, Row> rowTransformer,
             @Nullable ImmutableBitSet requiredColumns
         ) {
-            Predicate<Row> filter = row -> {
-                filteredRows.incrementAndGet();
-
-                return true;
-            };
-
-            filters = filters == null ? filter : filter.and(filters);
-
-            IndexScan<Row> scan = (IndexScan<Row>)delegate.scan(execCtx, grp, 
filters, ranges, rowTransformer,
-                requiredColumns);
+            IndexScan<Row> scan = (IndexScan<Row>)delegate.scan(execCtx, grp, 
ranges, requiredColumns);
 
             isInlineScan.set(scan.isInlineScan());
 
-            return scan;
+            return new Iterable<Row>() {
+                @NotNull @Override public Iterator<Row> iterator() {
+                    return F.iterator(scan.iterator(), r -> {
+                        filteredRows.incrementAndGet();
+
+                        return r;
+                    }, true);
+                }
+            };
         }
 
         /** */
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillQueryCommandDdlIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillQueryCommandDdlIntegrationTest.java
index 454e93ee7ca..efdf2a78ed3 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillQueryCommandDdlIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillQueryCommandDdlIntegrationTest.java
@@ -25,8 +25,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.calcite.util.ImmutableBitSet;
@@ -108,8 +106,6 @@ public class KillQueryCommandDdlIntegrationTest extends 
AbstractDdlIntegrationTe
             @Override public <Row> Iterable<Row> scan(
                 ExecutionContext<Row> execCtx,
                 ColocationGroup grp,
-                Predicate<Row> filter,
-                Function<Row, Row> rowTransformer,
                 @Nullable ImmutableBitSet usedColumns
             ) {
                 return new Iterable<Row>() {
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
index 4aa7a43a1de..08ac3d3f527 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
@@ -26,8 +26,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-import java.util.function.Predicate;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.apache.calcite.rel.RelCollation;
@@ -167,8 +165,6 @@ public class RunningQueriesIntegrationTest extends 
AbstractBasicIntegrationTest
             @Override public <Row> Iterable<Row> scan(
                 ExecutionContext<Row> execCtx,
                 ColocationGroup grp,
-                Predicate<Row> filter,
-                Function<Row, Row> rowTransformer,
                 @Nullable ImmutableBitSet usedColumns
             ) {
                 return new Iterable<Row>() {
@@ -398,8 +394,6 @@ public class RunningQueriesIntegrationTest extends 
AbstractBasicIntegrationTest
             @Override public <Row> Iterable<Row> scan(
                 ExecutionContext<Row> execCtx,
                 ColocationGroup grp,
-                Predicate<Row> filter,
-                Function<Row, Row> rowTransformer,
                 @Nullable ImmutableBitSet usedColumns
             ) {
                 initLatch.countDown();
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
index 2bb661e01d1..a58e9ae9a6e 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
@@ -285,11 +285,16 @@ public class SqlDiagnosticIntegrationTest extends 
AbstractBasicIntegrationTest {
 
         long startTime = U.currentTimeMillis();
 
+        AtomicInteger finishQryCnt = new AtomicInteger();
+        
grid(0).context().query().runningQueryManager().registerQueryFinishedListener(q 
-> finishQryCnt.incrementAndGet());
+
         sql(grid(0), "SELECT * FROM table(system_range(1, 1000))");
         sql(grid(0), "CREATE TABLE test_perf_stat (a INT)");
         sql(grid(0), "INSERT INTO test_perf_stat VALUES (0), (1), (2), (3), 
(4)");
         sql(grid(0), "SELECT * FROM test_perf_stat");
 
+        assertTrue(GridTestUtils.waitForCondition(() -> finishQryCnt.get() == 
4, 1_000L));
+
         // Only the last query should trigger queryReads event.
         // The first query uses generated data and doesn't require any page 
reads.
         // The second query is DDL and doesn't perform any page reads as well.
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TimeoutIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TimeoutIntegrationTest.java
index 47c84b1b7ff..a6f769a7c97 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TimeoutIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TimeoutIntegrationTest.java
@@ -20,8 +20,6 @@ package 
org.apache.ignite.internal.processors.query.calcite.integration;
 
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Predicate;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -39,6 +37,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGr
 import 
org.apache.ignite.internal.processors.query.calcite.schema.CacheTableImpl;
 import 
org.apache.ignite.internal.processors.query.calcite.schema.IgniteCacheTable;
 import org.apache.ignite.internal.util.lang.IgniteClosureX;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.spi.metric.LongMetric;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.jetbrains.annotations.Nullable;
@@ -144,14 +143,12 @@ public class TimeoutIntegrationTest extends 
AbstractBasicIntegrationTest {
                 @Override public <Row> Iterable<Row> scan(
                     ExecutionContext<Row> execCtx,
                     ColocationGroup grp,
-                    Predicate<Row> filter,
-                    Function<Row, Row> rowTransformer,
                     @Nullable ImmutableBitSet usedColumns
                 ) {
-                    return super.scan(execCtx, grp, r -> {
+                    return F.iterator(super.scan(execCtx, grp, usedColumns), r 
-> {
                         doSleep(SLEEP_PER_ROW);
-                        return true;
-                    }, rowTransformer, usedColumns);
+                        return r;
+                    }, true);
                 }
             };
 
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
index a4a50f4dd7b..e17b1a2067d 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -23,8 +23,6 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
-import java.util.function.Predicate;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
@@ -171,9 +169,7 @@ public class PlannerTest extends AbstractPlannerTest {
                 .build()) {
             @Override public <Row> Iterable<Row> scan(
                 ExecutionContext<Row> execCtx,
-                ColocationGroup group,
-                Predicate<Row> filter,
-                Function<Row, Row> transformer,
+                ColocationGroup grp,
                 ImmutableBitSet requiredColumns
             ) {
                 return Arrays.asList(
@@ -199,9 +195,7 @@ public class PlannerTest extends AbstractPlannerTest {
                 .build()) {
             @Override public <Row> Iterable<Row> scan(
                 ExecutionContext<Row> execCtx,
-                ColocationGroup group,
-                Predicate<Row> filter,
-                Function<Row, Row> transformer,
+                ColocationGroup grp,
                 ImmutableBitSet requiredColumns
             ) {
                 return Arrays.asList(
@@ -257,27 +251,22 @@ public class PlannerTest extends AbstractPlannerTest {
                 .build()) {
             @Override public <Row> Iterable<Row> scan(
                 ExecutionContext<Row> execCtx,
-                ColocationGroup group,
-                Predicate<Row> filter,
-                Function<Row, Row> rowTransformer,
+                ColocationGroup grp,
                 ImmutableBitSet requiredColumns
             ) {
+                List<Row> res = new ArrayList<>();
                 List<Row> checkRes0 = new ArrayList<>();
 
                 for (int i = 0; i < 10; ++i) {
                     int col = ThreadLocalRandom.current().nextInt(1_000);
 
-                    Row r = row(execCtx, requiredColumns, col, col);
-
-                    if (rowTransformer != null)
-                        r = rowTransformer.apply(r);
-
-                    checkRes0.add(r);
+                    res.add(row(execCtx, requiredColumns, col, col));
+                    checkRes0.add(row(execCtx, null, col + col));
                 }
 
                 checkRes.set(checkRes0);
 
-                return checkRes0;
+                return res;
             }
 
             @Override public ColocationGroup 
colocationGroup(MappingQueryContext ctx) {
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
index b07711417bf..8cf051f358c 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
@@ -25,8 +25,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.function.Function;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.plan.RelOptCluster;
@@ -169,8 +167,7 @@ public class TestTable implements IgniteCacheTable {
     /** {@inheritDoc} */
     @Override public <Row> Iterable<Row> scan(
         ExecutionContext<Row> execCtx,
-        ColocationGroup grp, Predicate<Row> filter,
-        Function<Row, Row> transformer,
+        ColocationGroup grp,
         ImmutableBitSet bitSet
     ) {
         throw new AssertionError();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
index f9632fbfa2f..3bad5315ae3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
@@ -349,6 +349,35 @@ public class RunningQueryManager {
             if (failed)
                 qrySpan.addTag(ERROR, failReason::getMessage);
 
+            //We need to collect query history and metrics only for SQL 
queries.
+            if (isSqlQuery(qry)) {
+                qry.runningFuture().onDone();
+
+                qryHistTracker.collectHistory(qry, failed);
+
+                if (!failed)
+                    successQrsCnt.increment();
+                else {
+                    failedQrsCnt.increment();
+
+                    // We measure cancel metric as "number of times user's 
queries ended up with query cancelled exception",
+                    // not "how many user's KILL QUERY command succeeded". 
These may be not the same if cancel was issued
+                    // right when query failed due to some other reason.
+                    if (QueryUtils.wasCancelled(failReason))
+                        canceledQrsCnt.increment();
+                }
+            }
+
+            if (ctx.performanceStatistics().enabled() && qry.startTimeNanos() 
> 0) {
+                ctx.performanceStatistics().query(
+                    qry.queryType(),
+                    qry.query(),
+                    qry.id(),
+                    qry.startTime(),
+                    System.nanoTime() - qry.startTimeNanos(),
+                    !failed);
+            }
+
             if (!qryFinishedListeners.isEmpty()) {
                 GridQueryFinishedInfo info = new GridQueryFinishedInfo(
                     qry.id(),
@@ -385,35 +414,6 @@ public class RunningQueryManager {
                     throw new IgniteException(ex.getMessage(), ex);
                 }
             }
-
-            //We need to collect query history and metrics only for SQL 
queries.
-            if (isSqlQuery(qry)) {
-                qry.runningFuture().onDone();
-
-                qryHistTracker.collectHistory(qry, failed);
-
-                if (!failed)
-                    successQrsCnt.increment();
-                else {
-                    failedQrsCnt.increment();
-
-                    // We measure cancel metric as "number of times user's 
queries ended up with query cancelled exception",
-                    // not "how many user's KILL QUERY command succeeded". 
These may be not the same if cancel was issued
-                    // right when query failed due to some other reason.
-                    if (QueryUtils.wasCancelled(failReason))
-                        canceledQrsCnt.increment();
-                }
-            }
-
-            if (ctx.performanceStatistics().enabled() && qry.startTimeNanos() 
> 0) {
-                ctx.performanceStatistics().query(
-                    qry.queryType(),
-                    qry.query(),
-                    qry.id(),
-                    qry.startTime(),
-                    System.nanoTime() - qry.startTimeNanos(),
-                    !failed);
-            }
         }
         finally {
             qrySpan.end();

Reply via email to