This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new de08e0c40a3 IGNITE-20006 SQL Calcite: Move filtering/transforming from
scan iterators to ScanNode - Fixes #10852.
de08e0c40a3 is described below
commit de08e0c40a32f28d15ead81c33e3e41d1443b5a0
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Wed Aug 2 09:29:23 2023 +0300
IGNITE-20006 SQL Calcite: Move filtering/transforming from scan iterators
to ScanNode - Fixes #10852.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../query/calcite/exec/AbstractIndexScan.java | 25 +------
.../query/calcite/exec/IndexFirstLastScan.java | 3 +-
.../processors/query/calcite/exec/IndexScan.java | 15 +---
.../query/calcite/exec/LogicalRelImplementor.java | 33 ++++-----
.../query/calcite/exec/RuntimeHashIndex.java | 24 ++-----
.../query/calcite/exec/RuntimeSortedIndex.java | 8 +--
.../query/calcite/exec/SystemViewScan.java | 28 +-------
.../processors/query/calcite/exec/TableScan.java | 21 +-----
.../query/calcite/exec/rel/IndexSpoolNode.java | 13 ++--
.../query/calcite/exec/rel/ScanNode.java | 82 ++++++++++++++++++----
.../query/calcite/exec/rel/ScanStorageNode.java | 67 ++++++++++++++++++
.../query/calcite/schema/CacheIndexImpl.java | 12 ++--
.../query/calcite/schema/CacheTableImpl.java | 17 ++---
.../query/calcite/schema/IgniteIndex.java | 4 --
.../query/calcite/schema/IgniteTable.java | 10 +--
.../query/calcite/schema/SystemViewIndexImpl.java | 6 --
.../query/calcite/schema/SystemViewTableImpl.java | 6 +-
.../calcite/exec/LogicalRelImplementorTest.java | 35 +++++----
.../integration/AbstractBasicIntegrationTest.java | 6 +-
.../integration/IndexScanlIntegrationTest.java | 27 ++++---
.../KillQueryCommandDdlIntegrationTest.java | 4 --
.../integration/RunningQueriesIntegrationTest.java | 6 --
.../integration/SqlDiagnosticIntegrationTest.java | 5 ++
.../integration/TimeoutIntegrationTest.java | 11 ++-
.../query/calcite/planner/PlannerTest.java | 25 ++-----
.../query/calcite/planner/TestTable.java | 5 +-
.../query/running/RunningQueryManager.java | 58 +++++++--------
27 files changed, 257 insertions(+), 299 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractIndexScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractIndexScan.java
index a180fe7c81d..6e87a7b3e6f 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractIndexScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractIndexScan.java
@@ -19,8 +19,6 @@ package
org.apache.ignite.internal.processors.query.calcite.exec;
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;
-import java.util.function.Function;
-import java.util.function.Predicate;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.IgniteCheckedException;
import
org.apache.ignite.internal.cache.query.index.sorted.inline.IndexQueryContext;
@@ -39,15 +37,9 @@ public abstract class AbstractIndexScan<Row, IdxRow>
implements Iterable<Row>, A
/** */
private final TreeIndex<IdxRow> idx;
- /** Additional filters. */
- private final Predicate<Row> filters;
-
/** Index scan bounds. */
private final RangeIterable<Row> ranges;
- /** */
- private final Function<Row, Row> rowTransformer;
-
/** */
protected final ExecutionContext<Row> ectx;
@@ -57,23 +49,18 @@ public abstract class AbstractIndexScan<Row, IdxRow>
implements Iterable<Row>, A
/**
* @param ectx Execution context.
* @param idx Physical index.
- * @param filters Additional filters.
* @param ranges Index scan bounds.
*/
protected AbstractIndexScan(
ExecutionContext<Row> ectx,
RelDataType rowType,
TreeIndex<IdxRow> idx,
- Predicate<Row> filters,
- RangeIterable<Row> ranges,
- Function<Row, Row> rowTransformer
+ RangeIterable<Row> ranges
) {
this.ectx = ectx;
this.rowType = rowType;
this.idx = idx;
- this.filters = filters;
this.ranges = ranges;
- this.rowTransformer = rowTransformer;
}
/** {@inheritDoc} */
@@ -164,15 +151,7 @@ public abstract class AbstractIndexScan<Row, IdxRow>
implements Iterable<Row>, A
while (next == null && cursor.next()) {
IdxRow idxRow = cursor.get();
- Row r = indexRow2Row(idxRow);
-
- if (filters != null && !filters.test(r))
- continue;
-
- if (rowTransformer != null)
- r = rowTransformer.apply(r);
-
- next = r;
+ next = indexRow2Row(idxRow);
}
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexFirstLastScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexFirstLastScan.java
index dd316442cf6..cc1b8d70a23 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexFirstLastScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexFirstLastScan.java
@@ -49,8 +49,7 @@ public class IndexFirstLastScan<Row> extends IndexScan<Row> {
int[] parts,
@Nullable ImmutableBitSet requiredColumns
) {
- super(ectx, desc, new FirstLastIndexWrapper(idx, first),
idxFieldMapping, parts, null, null, null,
- requiredColumns);
+ super(ectx, desc, new FirstLastIndexWrapper(idx, first),
idxFieldMapping, parts, null, requiredColumns);
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
index a8fd6c4ab89..af8aaf431e7 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
@@ -21,8 +21,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.function.Function;
-import java.util.function.Predicate;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
@@ -109,7 +107,6 @@ public class IndexScan<Row> extends AbstractIndexScan<Row,
IndexRow> {
* @param desc Table descriptor.
* @param idxFieldMapping Mapping from index keys to row fields.
* @param idx Physical index.
- * @param filters Additional filters.
* @param ranges Index scan bounds.
*/
public IndexScan(
@@ -118,13 +115,10 @@ public class IndexScan<Row> extends
AbstractIndexScan<Row, IndexRow> {
InlineIndex idx,
ImmutableIntList idxFieldMapping,
int[] parts,
- Predicate<Row> filters,
RangeIterable<Row> ranges,
- Function<Row, Row> rowTransformer,
@Nullable ImmutableBitSet requiredColumns
) {
- this(ectx, desc, new TreeIndexWrapper(idx), idxFieldMapping, parts,
filters, ranges, rowTransformer,
- requiredColumns);
+ this(ectx, desc, new TreeIndexWrapper(idx), idxFieldMapping, parts,
ranges, requiredColumns);
}
/**
@@ -132,7 +126,6 @@ public class IndexScan<Row> extends AbstractIndexScan<Row,
IndexRow> {
* @param desc Table descriptor.
* @param idxFieldMapping Mapping from index keys to row fields.
* @param treeIdx Physical index wrapper.
- * @param filters Additional filters.
* @param ranges Index scan bounds.
*/
protected IndexScan(
@@ -141,18 +134,14 @@ public class IndexScan<Row> extends
AbstractIndexScan<Row, IndexRow> {
TreeIndexWrapper treeIdx,
ImmutableIntList idxFieldMapping,
int[] parts,
- Predicate<Row> filters,
RangeIterable<Row> ranges,
- Function<Row, Row> rowTransformer,
@Nullable ImmutableBitSet requiredColumns
) {
super(
ectx,
desc.rowType(ectx.getTypeFactory(), requiredColumns),
treeIdx,
- filters,
- ranges,
- rowTransformer
+ ranges
);
this.desc = desc;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 07dc8c645f1..3300f01b58e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -60,6 +60,7 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.ProjectNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanNode;
+import
org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanStorageNode;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.SortAggregateNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortNode;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.TableSpoolNode;
@@ -312,9 +313,9 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
IgniteIndex idx = tbl.getIndex(rel.indexName());
if (idx != null && !tbl.isIndexRebuildInProgress()) {
- Iterable<Row> rowsIter = idx.scan(ctx, grp, filters, ranges, prj,
requiredColumns);
+ Iterable<Row> rowsIter = idx.scan(ctx, grp, ranges,
requiredColumns);
- return new ScanNode<>(ctx, rowType, rowsIter);
+ return new ScanStorageNode<>(ctx, rowType, rowsIter, filters, prj);
}
else {
// Index was invalidated after planning, workaround through
table-scan -> sort -> index spool.
@@ -331,8 +332,6 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
Iterable<Row> rowsIter = tbl.scan(
ctx,
grp,
- filterHasCorrelation ? null : filters,
- projNodeRequired ? null : prj,
requiredColumns
);
@@ -340,7 +339,8 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
if (!spoolNodeRequired && projects != null)
rowType = rel.getRowType();
- Node<Row> node = new ScanNode<>(ctx, rowType, rowsIter);
+ Node<Row> node = new ScanStorageNode<>(ctx, rowType, rowsIter,
filterHasCorrelation ? null : filters,
+ projNodeRequired ? null : prj);
RelCollation collation = rel.collation();
@@ -406,15 +406,15 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
IgniteIndex idx = tbl.getIndex(rel.indexName());
if (idx != null && !tbl.isIndexRebuildInProgress()) {
- return new ScanNode<>(ctx, rel.getRowType(), () ->
Collections.singletonList(ctx.rowHandler()
+ return new ScanStorageNode<>(ctx, rel.getRowType(), () ->
Collections.singletonList(ctx.rowHandler()
.factory(ctx.getTypeFactory(), rel.getRowType())
.create(idx.count(ctx, ctx.group(rel.sourceId()),
rel.notNull()))).iterator());
}
else {
CollectNode<Row> replacement =
CollectNode.createCountCollector(ctx);
- replacement.register(new ScanNode<>(ctx,
rel.getTable().getRowType(), tbl.scan(ctx,
- ctx.group(rel.sourceId()), null, null,
ImmutableBitSet.of(0))));
+ replacement.register(new ScanStorageNode<>(ctx,
rel.getTable().getRowType(), tbl.scan(ctx,
+ ctx.group(rel.sourceId()), ImmutableBitSet.of(0))));
return replacement;
}
@@ -430,19 +430,14 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);
if (idx != null && !tbl.isIndexRebuildInProgress())
- return new ScanNode<>(ctx, rowType,
idx.firstOrLast(idxBndRel.first(), ctx, grp, requiredColumns));
+ return new ScanStorageNode<>(ctx, rowType,
idx.firstOrLast(idxBndRel.first(), ctx, grp, requiredColumns));
else {
assert requiredColumns.cardinality() == 1;
- Iterable<Row> rowsIter = tbl.scan(
- ctx,
- grp,
- r -> ctx.rowHandler().get(0, r) != null,
- null,
- idxBndRel.requiredColumns()
- );
+ Iterable<Row> rowsIter = tbl.scan(ctx, grp,
idxBndRel.requiredColumns());
- Node<Row> scanNode = new ScanNode<>(ctx, rowType, rowsIter);
+ Node<Row> scanNode = new ScanStorageNode<>(ctx, rowType, rowsIter,
+ r -> ctx.rowHandler().get(0, r) != null, null);
RelCollation collation =
idx.collation().apply(LogicalScanConverterRule.createMapping(
null,
@@ -484,9 +479,9 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
ColocationGroup group = ctx.group(rel.sourceId());
- Iterable<Row> rowsIter = tbl.scan(ctx, group, filters, prj,
requiredColunms);
+ Iterable<Row> rowsIter = tbl.scan(ctx, group, requiredColunms);
- return new ScanNode<>(ctx, rowType, rowsIter);
+ return new ScanStorageNode<>(ctx, rowType, rowsIter, filters, prj);
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java
index fddf43b7126..f0163e6d91e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java
@@ -21,16 +21,12 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.function.Predicate;
import java.util.function.Supplier;
-
import org.apache.calcite.util.ImmutableBitSet;
import
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
-import org.apache.ignite.internal.util.lang.GridFilteredIterator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
/**
* Runtime hash index based on on-heap hash map.
@@ -89,8 +85,8 @@ public class RuntimeHashIndex<Row> implements
RuntimeIndex<Row> {
}
/** */
- public Iterable<Row> scan(Supplier<Row> searchRow, @Nullable
Predicate<Row> filter) {
- return new IndexScan(searchRow, filter);
+ public Iterable<Row> scan(Supplier<Row> searchRow) {
+ return new IndexScan(searchRow);
}
/** */
@@ -116,16 +112,11 @@ public class RuntimeHashIndex<Row> implements
RuntimeIndex<Row> {
/** Search row. */
private final Supplier<Row> searchRow;
- /** Row filter. */
- private final Predicate<Row> filter;
-
/**
* @param searchRow Search row.
- * @param filter Scan condition.
*/
- IndexScan(Supplier<Row> searchRow, @Nullable Predicate<Row> filter) {
+ IndexScan(Supplier<Row> searchRow) {
this.searchRow = searchRow;
- this.filter = filter;
}
/** {@inheritDoc} */
@@ -142,14 +133,7 @@ public class RuntimeHashIndex<Row> implements
RuntimeIndex<Row> {
List<Row> eqRows = rows.get(key);
- if (eqRows == null)
- return Collections.emptyIterator();
-
- return filter == null ? eqRows.iterator() : new
GridFilteredIterator<Row>(eqRows.iterator()) {
- @Override protected boolean accept(Row row) {
- return filter.test(row);
- }
- };
+ return eqRows == null ? Collections.emptyIterator() :
eqRows.iterator();
}
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndex.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndex.java
index e58d17f2759..ecb92c451c9 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndex.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndex.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
-import java.util.function.Predicate;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.type.RelDataType;
import
org.apache.ignite.internal.cache.query.index.sorted.inline.IndexQueryContext;
@@ -100,10 +99,9 @@ public class RuntimeSortedIndex<Row> implements
RuntimeIndex<Row>, TreeIndex<Row
public Iterable<Row> scan(
ExecutionContext<Row> ectx,
RelDataType rowType,
- Predicate<Row> filter,
RangeIterable<Row> ranges
) {
- return new IndexScan(rowType, this, filter, ranges);
+ return new IndexScan(rowType, this, ranges);
}
/**
@@ -190,16 +188,14 @@ public class RuntimeSortedIndex<Row> implements
RuntimeIndex<Row>, TreeIndex<Row
/**
* @param rowType Row type.
* @param idx Physical index.
- * @param filter Additional filters.
* @param ranges Index scan bounds.
*/
IndexScan(
RelDataType rowType,
TreeIndex<Row> idx,
- Predicate<Row> filter,
RangeIterable<Row> ranges
) {
- super(RuntimeSortedIndex.this.ectx, rowType, idx, filter, ranges,
null);
+ super(RuntimeSortedIndex.this.ectx, rowType, idx, ranges);
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SystemViewScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SystemViewScan.java
index 2fa944ae678..76f71f50837 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SystemViewScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SystemViewScan.java
@@ -20,8 +20,6 @@ package
org.apache.ignite.internal.processors.query.calcite.exec;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.function.Function;
-import java.util.function.Predicate;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import
org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
@@ -31,8 +29,6 @@ import
org.apache.ignite.internal.processors.query.calcite.schema.SystemViewColu
import
org.apache.ignite.internal.processors.query.calcite.schema.SystemViewTableDescriptorImpl;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.systemview.view.FiltrableSystemView;
import org.apache.ignite.spi.systemview.view.SystemView;
import org.jetbrains.annotations.Nullable;
@@ -51,12 +47,6 @@ public class SystemViewScan<Row, ViewRow> implements
Iterable<Row> {
/** */
private final RangeIterable<Row> ranges;
- /** */
- private final Predicate<Row> filters;
-
- /** */
- private final Function<Row, Row> rowTransformer;
-
/** Participating colunms. */
private final ImmutableBitSet requiredColumns;
@@ -71,15 +61,11 @@ public class SystemViewScan<Row, ViewRow> implements
Iterable<Row> {
ExecutionContext<Row> ectx,
SystemViewTableDescriptorImpl<ViewRow> desc,
@Nullable RangeIterable<Row> ranges,
- Predicate<Row> filters,
- Function<Row, Row> rowTransformer,
@Nullable ImmutableBitSet requiredColumns
) {
this.ectx = ectx;
this.desc = desc;
this.ranges = ranges;
- this.filters = filters;
- this.rowTransformer = rowTransformer;
this.requiredColumns = requiredColumns;
RelDataType rowType = desc.rowType(ectx.getTypeFactory(),
requiredColumns);
@@ -137,18 +123,6 @@ public class SystemViewScan<Row, ViewRow> implements
Iterable<Row> {
else
viewIter = view.iterator();
- Iterator<Row> iter = F.iterator(
- viewIter,
- row -> desc.toRow(ectx, row, factory, requiredColumns),
- true);
-
- if (rowTransformer != null || filters != null) {
- IgniteClosure<Row, Row> trans = rowTransformer == null ?
F.identity() : rowTransformer::apply;
- IgnitePredicate<Row> filter = filters == null ? F.alwaysTrue() :
filters::test;
-
- iter = F.iterator(iter, trans, true, filter);
- }
-
- return iter;
+ return F.iterator(viewIter, row -> desc.toRow(ectx, row, factory,
requiredColumns), true);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
index b4637e6cb44..4e1d15fea35 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
@@ -24,8 +24,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
-import java.util.function.Function;
-import java.util.function.Predicate;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.IgniteCheckedException;
@@ -51,9 +49,6 @@ public class TableScan<Row> implements Iterable<Row>,
AutoCloseable {
/** */
private final GridCacheContext<?, ?> cctx;
- /** */
- private final Predicate<Row> filters;
-
/** */
private final ExecutionContext<Row> ectx;
@@ -75,9 +70,6 @@ public class TableScan<Row> implements Iterable<Row>,
AutoCloseable {
/** */
private volatile List<GridDhtLocalPartition> reserved;
- /** */
- private final Function<Row, Row> rowTransformer;
-
/** Participating colunms. */
private final ImmutableBitSet requiredColunms;
@@ -86,16 +78,12 @@ public class TableScan<Row> implements Iterable<Row>,
AutoCloseable {
ExecutionContext<Row> ectx,
CacheTableDescriptor desc,
int[] parts,
- Predicate<Row> filters,
- Function<Row, Row> rowTransformer,
@Nullable ImmutableBitSet requiredColunms
) {
this.ectx = ectx;
cctx = desc.cacheContext();
this.desc = desc;
this.parts = parts;
- this.filters = filters;
- this.rowTransformer = rowTransformer;
this.requiredColunms = requiredColunms;
RelDataType rowType = desc.rowType(this.ectx.getTypeFactory(),
requiredColunms);
@@ -265,15 +253,8 @@ public class TableScan<Row> implements Iterable<Row>,
AutoCloseable {
if (!desc.match(row))
continue;
- Row r = desc.toRow(ectx, row, factory, requiredColunms);
-
- if (filters != null && !filters.test(r))
- continue;
-
- if (rowTransformer != null)
- r = rowTransformer.apply(r);
+ next = desc.toRow(ectx, row, factory, requiredColunms);
- next = r;
break;
}
else
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java
index d159441d8fe..998792f0f36 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java
@@ -174,12 +174,9 @@ public class IndexSpoolNode<Row> extends
MemoryTrackingNode<Row> implements Sing
ScanNode<Row> scan = new ScanNode<>(
ctx,
rowType,
- idx.scan(
- ctx,
- rowType,
- filter,
- ranges
- )
+ idx.scan(ctx, rowType, ranges),
+ filter,
+ null
);
return new IndexSpoolNode<>(ctx, rowType, idx, scan);
@@ -199,7 +196,9 @@ public class IndexSpoolNode<Row> extends
MemoryTrackingNode<Row> implements Sing
ScanNode<Row> scan = new ScanNode<>(
ctx,
rowType,
- idx.scan(searchRow, filter)
+ idx.scan(searchRow),
+ filter,
+ null
);
return new IndexSpoolNode<>(ctx, rowType, idx, scan);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
index 62838096c8f..a59badc7160 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
@@ -19,9 +19,12 @@ package
org.apache.ignite.internal.processors.query.calcite.exec.rel;
import java.util.Iterator;
import java.util.List;
+import java.util.function.Function;
+import java.util.function.Predicate;
import org.apache.calcite.rel.type.RelDataType;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
/**
* Scan node.
@@ -30,6 +33,12 @@ public class ScanNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>
/** */
private final Iterable<Row> src;
+ /** */
+ @Nullable private final Predicate<Row> filter;
+
+ /** */
+ @Nullable private final Function<Row, Row> rowTransformer;
+
/** */
private Iterator<Row> it;
@@ -44,12 +53,32 @@ public class ScanNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>
/**
* @param ctx Execution context.
+ * @param rowType Row type.
* @param src Source.
*/
public ScanNode(ExecutionContext<Row> ctx, RelDataType rowType,
Iterable<Row> src) {
+ this(ctx, rowType, src, null, null);
+ }
+
+ /**
+ * @param ctx Execution context.
+ * @param rowType Row type.
+ * @param src Source.
+ * @param filter Row filter.
+ * @param rowTransformer Row transformer (projection).
+ */
+ public ScanNode(
+ ExecutionContext<Row> ctx,
+ RelDataType rowType,
+ Iterable<Row> src,
+ @Nullable Predicate<Row> filter,
+ @Nullable Function<Row, Row> rowTransformer
+ ) {
super(ctx, rowType);
this.src = src;
+ this.filter = filter;
+ this.rowTransformer = rowTransformer;
}
/** {@inheritDoc} */
@@ -110,30 +139,41 @@ public class ScanNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>
inLoop = true;
try {
- context().ioTracker().startTracking();
-
if (it == null)
it = src.iterator();
- int processed = 0;
- while (requested > 0 && it.hasNext()) {
- checkState();
+ processNextBatch();
+ }
+ finally {
+ inLoop = false;
+ }
+ }
+
+ /**
+ * @return Count of processed rows.
+ */
+ protected int processNextBatch() throws Exception {
+ int processed = 0;
+ while (requested > 0 && it.hasNext()) {
+ checkState();
+
+ Row r = it.next();
+ if (filter == null || filter.test(r)) {
requested--;
- downstream().push(it.next());
- if (++processed == IN_BUFFER_SIZE && requested > 0) {
- // allow others to do their job
- context().execute(this::push, this::onError);
+ if (rowTransformer != null)
+ r = rowTransformer.apply(r);
- return;
- }
+ downstream().push(r);
}
- }
- finally {
- context().ioTracker().stopTracking();
- inLoop = false;
+ if (++processed == IN_BUFFER_SIZE && requested > 0) {
+ // Allow others to do their job.
+ context().execute(this::push, this::onError);
+
+ return processed;
+ }
}
if (requested > 0 && !it.hasNext()) {
@@ -144,5 +184,17 @@ public class ScanNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>
downstream().end();
}
+
+ return processed;
+ }
+
+ /** */
+ @Nullable public Predicate<Row> filter() {
+ return filter;
+ }
+
+ /** */
+ @Nullable public Function<Row, Row> rowTransformer() {
+ return rowTransformer;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanStorageNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanStorageNode.java
new file mode 100644
index 00000000000..4d126208431
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanStorageNode.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.calcite.rel.type.RelDataType;
+import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Scan storage node.
+ */
+public class ScanStorageNode<Row> extends ScanNode<Row> {
+ /**
+ * @param ctx Execution context.
+ * @param rowType Row type.
+ * @param src Source.
+ * @param filter Row filter.
+ * @param rowTransformer Row transformer (projection).
+ */
+ public ScanStorageNode(
+ ExecutionContext<Row> ctx,
+ RelDataType rowType,
+ Iterable<Row> src,
+ @Nullable Predicate<Row> filter,
+ @Nullable Function<Row, Row> rowTransformer
+ ) {
+ super(ctx, rowType, src, filter, rowTransformer);
+ }
+
+ /**
+ * @param ctx Execution context.
+ * @param rowType Row type.
+ * @param src Source.
+ */
+ public ScanStorageNode(ExecutionContext<Row> ctx, RelDataType rowType,
Iterable<Row> src) {
+ super(ctx, rowType, src);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int processNextBatch() throws Exception {
+ try {
+ context().ioTracker().startTracking();
+
+ return super.processNextBatch();
+ }
+ finally {
+ context().ioTracker().stopTracking();
+ }
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java
index 9fd8a5553b0..a6162af69dd 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java
@@ -21,8 +21,6 @@ import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
-import java.util.function.Predicate;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelCollation;
@@ -116,16 +114,14 @@ public class CacheIndexImpl implements IgniteIndex {
/** */
@Override public <Row> Iterable<Row> scan(
ExecutionContext<Row> execCtx,
- ColocationGroup group,
- Predicate<Row> filters,
+ ColocationGroup grp,
RangeIterable<Row> ranges,
- Function<Row, Row> rowTransformer,
@Nullable ImmutableBitSet requiredColumns
) {
- UUID localNodeId = execCtx.localNodeId();
- if (group.nodeIds().contains(localNodeId) && idx != null) {
+ UUID locNodeId = execCtx.localNodeId();
+ if (grp.nodeIds().contains(locNodeId) && idx != null) {
return new IndexScan<>(execCtx, tbl.descriptor(),
idx.unwrap(InlineIndex.class), collation.getKeys(),
- group.partitions(localNodeId), filters, ranges,
rowTransformer, requiredColumns);
+ grp.partitions(locNodeId), ranges, requiredColumns);
}
return Collections.emptyList();
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java
index 0487725cfa1..8e2b80390bb 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java
@@ -22,8 +22,6 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
-import java.util.function.Predicate;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.type.RelDataType;
@@ -109,14 +107,13 @@ public class CacheTableImpl extends AbstractTable
implements IgniteCacheTable {
/** {@inheritDoc} */
@Override public <Row> Iterable<Row> scan(
ExecutionContext<Row> execCtx,
- ColocationGroup group,
- Predicate<Row> filter,
- Function<Row, Row> rowTransformer,
- @Nullable ImmutableBitSet usedColumns) {
- UUID localNodeId = execCtx.localNodeId();
-
- if (group.nodeIds().contains(localNodeId))
- return new TableScan<>(execCtx, desc,
group.partitions(localNodeId), filter, rowTransformer, usedColumns);
+ ColocationGroup grp,
+ @Nullable ImmutableBitSet usedColumns
+ ) {
+ UUID locNodeId = execCtx.localNodeId();
+
+ if (grp.nodeIds().contains(locNodeId))
+ return new TableScan<>(execCtx, desc, grp.partitions(locNodeId),
usedColumns);
return Collections.emptyList();
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java
index 81ded7289bc..618a78920a2 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.processors.query.calcite.schema;
import java.util.List;
-import java.util.function.Function;
-import java.util.function.Predicate;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelCollation;
@@ -80,9 +78,7 @@ public interface IgniteIndex {
public <Row> Iterable<Row> scan(
ExecutionContext<Row> execCtx,
ColocationGroup grp,
- Predicate<Row> filters,
RangeIterable<Row> ranges,
- Function<Row, Row> rowTransformer,
@Nullable ImmutableBitSet requiredColumns
);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index 1d12c8c687f..1c69a60ed39 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -18,8 +18,6 @@ package
org.apache.ignite.internal.processors.query.calcite.schema;
import java.util.List;
import java.util.Map;
-import java.util.function.Function;
-import java.util.function.Predicate;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.core.TableScan;
@@ -84,17 +82,13 @@ public interface IgniteTable extends TranslatableTable {
* Creates rows iterator over the table.
*
* @param execCtx Execution context.
- * @param group Colocation group.
- * @param filter Row filter.
- * @param rowTransformer Row transformer.
+ * @param grp Colocation group.
* @param usedColumns Used columns enumeration.
* @return Rows iterator.
*/
public <Row> Iterable<Row> scan(
ExecutionContext<Row> execCtx,
- ColocationGroup group,
- Predicate<Row> filter,
- Function<Row, Row> rowTransformer,
+ ColocationGroup grp,
@Nullable ImmutableBitSet usedColumns);
/**
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewIndexImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewIndexImpl.java
index 85717a5e9ad..d5eca929902 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewIndexImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewIndexImpl.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.processors.query.calcite.schema;
import java.util.List;
-import java.util.function.Function;
-import java.util.function.Predicate;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelCollation;
@@ -82,17 +80,13 @@ public class SystemViewIndexImpl implements IgniteIndex {
@Override public <Row> Iterable<Row> scan(
ExecutionContext<Row> execCtx,
ColocationGroup grp,
- Predicate<Row> filters,
RangeIterable<Row> ranges,
- Function<Row, Row> rowTransformer,
@Nullable ImmutableBitSet requiredColumns
) {
return new SystemViewScan<>(
execCtx,
tbl.descriptor(),
ranges,
- filters,
- rowTransformer,
requiredColumns
);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java
index fee4ca5f2c2..44394991a7f 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java
@@ -20,8 +20,6 @@ package
org.apache.ignite.internal.processors.query.calcite.schema;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.function.Function;
-import java.util.function.Predicate;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
@@ -103,11 +101,9 @@ public class SystemViewTableImpl extends AbstractTable
implements IgniteTable {
@Override public <Row> Iterable<Row> scan(
ExecutionContext<Row> execCtx,
ColocationGroup grp,
- Predicate<Row> filter,
- Function<Row, Row> rowTransformer,
@Nullable ImmutableBitSet usedColumns
) {
- return new SystemViewScan<>(execCtx, desc, null, filter,
rowTransformer, usedColumns);
+ return new SystemViewScan<>(execCtx, desc, null, usedColumns);
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java
index e5651d92e0b..8105a9da3fd 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
-import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.rel.RelCollation;
@@ -77,7 +76,7 @@ public class LogicalRelImplementorTest extends
GridCommonAbstractTest {
private RelOptCluster cluster;
/** */
- private ScanAwareTable tbl;
+ private TestTable tbl;
/** */
private BaseQueryContext qctx;
@@ -105,7 +104,7 @@ public class LogicalRelImplementorTest extends
GridCommonAbstractTest {
RelDataType rowType = b.build();
- tbl = new ScanAwareTable(rowType);
+ tbl = new ScannableTestTable(rowType);
IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
publicSchema.addTable("TBL", tbl);
@@ -284,13 +283,13 @@ public class LogicalRelImplementorTest extends
GridCommonAbstractTest {
tbl.markIndexRebuildInProgress(true);
Predicate<Node<Object[]>> isScanNoFilterNoProject =
- node -> node instanceof ScanNode && !tbl.lastScanHasFilter &&
!tbl.lastScanHasProject;
+ node -> node instanceof ScanNode && !hasFilter(node) &&
!hasProject(node);
Predicate<Node<Object[]>> isScanWithFilterNoProject =
- node -> node instanceof ScanNode && tbl.lastScanHasFilter &&
!tbl.lastScanHasProject;
+ node -> node instanceof ScanNode && hasFilter(node) &&
!hasProject(node);
Predicate<Node<Object[]>> isScanWithProjectNoFilter =
- node -> node instanceof ScanNode && !tbl.lastScanHasFilter &&
tbl.lastScanHasProject;
+ node -> node instanceof ScanNode && !hasFilter(node) &&
hasProject(node);
Predicate<Node<Object[]>> isScanWithFilterWithProject =
- node -> node instanceof ScanNode && tbl.lastScanHasFilter &&
tbl.lastScanHasProject;
+ node -> node instanceof ScanNode && hasFilter(node) &&
hasProject(node);
Predicate<Node<Object[]>> isSort = node -> node instanceof SortNode;
Predicate<Node<Object[]>> isSpool = node -> node instanceof
IndexSpoolNode;
@@ -418,28 +417,28 @@ public class LogicalRelImplementorTest extends
GridCommonAbstractTest {
}
/** */
- private static class ScanAwareTable extends TestTable {
- /** */
- private volatile boolean lastScanHasFilter;
+ private boolean hasFilter(Node<?> node) {
+ return ((ScanNode<?>)node).filter() != null;
+ }
- /** */
- private volatile boolean lastScanHasProject;
+ /** */
+ private boolean hasProject(Node<?> node) {
+ return ((ScanNode<?>)node).rowTransformer() != null;
+ }
+ /** */
+ private static class ScannableTestTable extends TestTable {
/** */
- public ScanAwareTable(RelDataType rowType) {
- super(rowType);
+ public ScannableTestTable(RelDataType type) {
+ super(type);
}
/** {@inheritDoc} */
@Override public <Row> Iterable<Row> scan(
ExecutionContext<Row> execCtx,
ColocationGroup grp,
- Predicate<Row> filter,
- Function<Row, Row> transformer,
ImmutableBitSet bitSet
) {
- lastScanHasFilter = filter != null;
- lastScanHasProject = transformer != null;
return Collections.emptyList();
}
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index 6348af14038..111416f0da8 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -18,8 +18,6 @@
package org.apache.ignite.internal.processors.query.calcite.integration;
import java.util.List;
-import java.util.function.Function;
-import java.util.function.Predicate;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelCollation;
@@ -258,12 +256,10 @@ public class AbstractBasicIntegrationTest extends
GridCommonAbstractTest {
@Override public <Row> Iterable<Row> scan(
ExecutionContext<Row> execCtx,
ColocationGroup grp,
- Predicate<Row> filters,
RangeIterable<Row> ranges,
- Function<Row, Row> rowTransformer,
@Nullable ImmutableBitSet requiredColumns
) {
- return delegate.scan(execCtx, grp, filters, ranges,
rowTransformer, requiredColumns);
+ return delegate.scan(execCtx, grp, ranges, requiredColumns);
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
index ddd2e320b47..875e5e760a3 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
@@ -21,13 +21,12 @@ import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
+import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
import java.util.function.IntFunction;
-import java.util.function.Predicate;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
@@ -44,6 +43,7 @@ import
org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import
org.apache.ignite.internal.processors.query.schema.management.SchemaManager;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
@@ -372,25 +372,22 @@ public class IndexScanlIntegrationTest extends
AbstractBasicIntegrationTest {
@Override public <Row> Iterable<Row> scan(
ExecutionContext<Row> execCtx,
ColocationGroup grp,
- Predicate<Row> filters,
RangeIterable<Row> ranges,
- Function<Row, Row> rowTransformer,
@Nullable ImmutableBitSet requiredColumns
) {
- Predicate<Row> filter = row -> {
- filteredRows.incrementAndGet();
-
- return true;
- };
-
- filters = filters == null ? filter : filter.and(filters);
-
- IndexScan<Row> scan = (IndexScan<Row>)delegate.scan(execCtx, grp,
filters, ranges, rowTransformer,
- requiredColumns);
+ IndexScan<Row> scan = (IndexScan<Row>)delegate.scan(execCtx, grp,
ranges, requiredColumns);
isInlineScan.set(scan.isInlineScan());
- return scan;
+ return new Iterable<Row>() {
+ @NotNull @Override public Iterator<Row> iterator() {
+ return F.iterator(scan.iterator(), r -> {
+ filteredRows.incrementAndGet();
+
+ return r;
+ }, true);
+ }
+ };
}
/** */
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillQueryCommandDdlIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillQueryCommandDdlIntegrationTest.java
index 454e93ee7ca..efdf2a78ed3 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillQueryCommandDdlIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillQueryCommandDdlIntegrationTest.java
@@ -25,8 +25,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
-import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.calcite.util.ImmutableBitSet;
@@ -108,8 +106,6 @@ public class KillQueryCommandDdlIntegrationTest extends
AbstractDdlIntegrationTe
@Override public <Row> Iterable<Row> scan(
ExecutionContext<Row> execCtx,
ColocationGroup grp,
- Predicate<Row> filter,
- Function<Row, Row> rowTransformer,
@Nullable ImmutableBitSet usedColumns
) {
return new Iterable<Row>() {
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
index 4aa7a43a1de..08ac3d3f527 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
@@ -26,8 +26,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-import java.util.function.Predicate;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.calcite.rel.RelCollation;
@@ -167,8 +165,6 @@ public class RunningQueriesIntegrationTest extends
AbstractBasicIntegrationTest
@Override public <Row> Iterable<Row> scan(
ExecutionContext<Row> execCtx,
ColocationGroup grp,
- Predicate<Row> filter,
- Function<Row, Row> rowTransformer,
@Nullable ImmutableBitSet usedColumns
) {
return new Iterable<Row>() {
@@ -398,8 +394,6 @@ public class RunningQueriesIntegrationTest extends
AbstractBasicIntegrationTest
@Override public <Row> Iterable<Row> scan(
ExecutionContext<Row> execCtx,
ColocationGroup grp,
- Predicate<Row> filter,
- Function<Row, Row> rowTransformer,
@Nullable ImmutableBitSet usedColumns
) {
initLatch.countDown();
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
index 2bb661e01d1..a58e9ae9a6e 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
@@ -285,11 +285,16 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
long startTime = U.currentTimeMillis();
+ AtomicInteger finishQryCnt = new AtomicInteger();
+
grid(0).context().query().runningQueryManager().registerQueryFinishedListener(q
-> finishQryCnt.incrementAndGet());
+
sql(grid(0), "SELECT * FROM table(system_range(1, 1000))");
sql(grid(0), "CREATE TABLE test_perf_stat (a INT)");
sql(grid(0), "INSERT INTO test_perf_stat VALUES (0), (1), (2), (3),
(4)");
sql(grid(0), "SELECT * FROM test_perf_stat");
+ assertTrue(GridTestUtils.waitForCondition(() -> finishQryCnt.get() ==
4, 1_000L));
+
// Only the last query should trigger queryReads event.
// The first query uses generated data and doesn't require any page
reads.
// The second query is DDL and doesn't perform any page reads as well.
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TimeoutIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TimeoutIntegrationTest.java
index 47c84b1b7ff..a6f769a7c97 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TimeoutIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TimeoutIntegrationTest.java
@@ -20,8 +20,6 @@ package
org.apache.ignite.internal.processors.query.calcite.integration;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Predicate;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
@@ -39,6 +37,7 @@ import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGr
import
org.apache.ignite.internal.processors.query.calcite.schema.CacheTableImpl;
import
org.apache.ignite.internal.processors.query.calcite.schema.IgniteCacheTable;
import org.apache.ignite.internal.util.lang.IgniteClosureX;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;
@@ -144,14 +143,12 @@ public class TimeoutIntegrationTest extends
AbstractBasicIntegrationTest {
@Override public <Row> Iterable<Row> scan(
ExecutionContext<Row> execCtx,
ColocationGroup grp,
- Predicate<Row> filter,
- Function<Row, Row> rowTransformer,
@Nullable ImmutableBitSet usedColumns
) {
- return super.scan(execCtx, grp, r -> {
+ return F.iterator(super.scan(execCtx, grp, usedColumns), r
-> {
doSleep(SLEEP_PER_ROW);
- return true;
- }, rowTransformer, usedColumns);
+ return r;
+ }, true);
}
};
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
index a4a50f4dd7b..e17b1a2067d 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -23,8 +23,6 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
-import java.util.function.Predicate;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
@@ -171,9 +169,7 @@ public class PlannerTest extends AbstractPlannerTest {
.build()) {
@Override public <Row> Iterable<Row> scan(
ExecutionContext<Row> execCtx,
- ColocationGroup group,
- Predicate<Row> filter,
- Function<Row, Row> transformer,
+ ColocationGroup grp,
ImmutableBitSet requiredColumns
) {
return Arrays.asList(
@@ -199,9 +195,7 @@ public class PlannerTest extends AbstractPlannerTest {
.build()) {
@Override public <Row> Iterable<Row> scan(
ExecutionContext<Row> execCtx,
- ColocationGroup group,
- Predicate<Row> filter,
- Function<Row, Row> transformer,
+ ColocationGroup grp,
ImmutableBitSet requiredColumns
) {
return Arrays.asList(
@@ -257,27 +251,22 @@ public class PlannerTest extends AbstractPlannerTest {
.build()) {
@Override public <Row> Iterable<Row> scan(
ExecutionContext<Row> execCtx,
- ColocationGroup group,
- Predicate<Row> filter,
- Function<Row, Row> rowTransformer,
+ ColocationGroup grp,
ImmutableBitSet requiredColumns
) {
+ List<Row> res = new ArrayList<>();
List<Row> checkRes0 = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
int col = ThreadLocalRandom.current().nextInt(1_000);
- Row r = row(execCtx, requiredColumns, col, col);
-
- if (rowTransformer != null)
- r = rowTransformer.apply(r);
-
- checkRes0.add(r);
+ res.add(row(execCtx, requiredColumns, col, col));
+ checkRes0.add(row(execCtx, null, col + col));
}
checkRes.set(checkRes0);
- return checkRes0;
+ return res;
}
@Override public ColocationGroup
colocationGroup(MappingQueryContext ctx) {
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
index b07711417bf..8cf051f358c 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
@@ -25,8 +25,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.function.Function;
-import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.plan.RelOptCluster;
@@ -169,8 +167,7 @@ public class TestTable implements IgniteCacheTable {
/** {@inheritDoc} */
@Override public <Row> Iterable<Row> scan(
ExecutionContext<Row> execCtx,
- ColocationGroup grp, Predicate<Row> filter,
- Function<Row, Row> transformer,
+ ColocationGroup grp,
ImmutableBitSet bitSet
) {
throw new AssertionError();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
index f9632fbfa2f..3bad5315ae3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
@@ -349,6 +349,35 @@ public class RunningQueryManager {
if (failed)
qrySpan.addTag(ERROR, failReason::getMessage);
+ //We need to collect query history and metrics only for SQL
queries.
+ if (isSqlQuery(qry)) {
+ qry.runningFuture().onDone();
+
+ qryHistTracker.collectHistory(qry, failed);
+
+ if (!failed)
+ successQrsCnt.increment();
+ else {
+ failedQrsCnt.increment();
+
+ // We measure cancel metric as "number of times user's
queries ended up with query cancelled exception",
+ // not "how many user's KILL QUERY command succeeded".
These may be not the same if cancel was issued
+ // right when query failed due to some other reason.
+ if (QueryUtils.wasCancelled(failReason))
+ canceledQrsCnt.increment();
+ }
+ }
+
+ if (ctx.performanceStatistics().enabled() && qry.startTimeNanos()
> 0) {
+ ctx.performanceStatistics().query(
+ qry.queryType(),
+ qry.query(),
+ qry.id(),
+ qry.startTime(),
+ System.nanoTime() - qry.startTimeNanos(),
+ !failed);
+ }
+
if (!qryFinishedListeners.isEmpty()) {
GridQueryFinishedInfo info = new GridQueryFinishedInfo(
qry.id(),
@@ -385,35 +414,6 @@ public class RunningQueryManager {
throw new IgniteException(ex.getMessage(), ex);
}
}
-
- //We need to collect query history and metrics only for SQL
queries.
- if (isSqlQuery(qry)) {
- qry.runningFuture().onDone();
-
- qryHistTracker.collectHistory(qry, failed);
-
- if (!failed)
- successQrsCnt.increment();
- else {
- failedQrsCnt.increment();
-
- // We measure cancel metric as "number of times user's
queries ended up with query cancelled exception",
- // not "how many user's KILL QUERY command succeeded".
These may be not the same if cancel was issued
- // right when query failed due to some other reason.
- if (QueryUtils.wasCancelled(failReason))
- canceledQrsCnt.increment();
- }
- }
-
- if (ctx.performanceStatistics().enabled() && qry.startTimeNanos()
> 0) {
- ctx.performanceStatistics().query(
- qry.queryType(),
- qry.query(),
- qry.id(),
- qry.startTime(),
- System.nanoTime() - qry.startTimeNanos(),
- !failed);
- }
}
finally {
qrySpan.end();