This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push:
new 88807bc IGNITE-13025 Calcite integration. Limit and offset support
(closes #7903, #8493)
88807bc is described below
commit 88807bc4d25c4959233e5504b46182c21cfa5234
Author: Stanilovsky Evgeny <[email protected]>
AuthorDate: Wed Nov 25 12:41:38 2020 +0300
IGNITE-13025 Calcite integration. Limit and offset support (closes #7903,
#8493)
---
.../query/calcite/exec/LogicalRelImplementor.java | 16 +
.../query/calcite/exec/exp/ExpressionFactory.java | 6 +
.../calcite/exec/exp/ExpressionFactoryImpl.java | 28 ++
.../query/calcite/exec/rel/AbstractNode.java | 2 +-
.../query/calcite/exec/rel/LimitNode.java | 145 ++++++++++
.../processors/query/calcite/exec/rel/Node.java | 1 -
.../query/calcite/exec/rel/ScanNode.java | 2 +-
.../query/calcite/exec/rel/SortNode.java | 6 +-
.../processors/query/calcite/prepare/Cloner.java | 6 +
.../query/calcite/prepare/IdGenerator.java | 17 ++
.../query/calcite/prepare/IgnitePlanner.java | 4 +-
.../query/calcite/prepare/IgniteRelShuttle.java | 6 +
.../query/calcite/prepare/IgniteSqlValidator.java | 52 +++-
.../query/calcite/prepare/PlannerPhase.java | 9 +-
.../processors/query/calcite/rel/IgniteLimit.java | 161 +++++++++++
.../query/calcite/rel/IgniteRelVisitor.java | 5 +
.../processors/query/calcite/rel/IgniteSort.java | 18 +-
.../query/calcite/rel/IgniteTrimExchange.java | 2 +-
.../rel/logical/IgniteLogicalIndexScan.java | 1 -
.../query/calcite/rule/SortConverterRule.java | 52 +++-
.../processors/query/calcite/trait/TraitUtils.java | 2 +-
.../query/calcite/util/IgniteResource.java | 5 +
.../processors/query/calcite/LimitOffsetTest.java | 322 +++++++++++++++++++++
.../processors/query/calcite/PlannerTest.java | 117 +++++++-
.../ignite/testsuites/IgniteCalciteTestSuite.java | 2 +
25 files changed, 943 insertions(+), 44 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 3a94f0b..a4ff6cd 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -38,6 +38,7 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.rel.AggregateNod
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.CorrelatedNestedLoopJoinNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.FilterNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.LimitNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ModifyNode;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.NestedLoopJoinNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
@@ -54,6 +55,7 @@ import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedN
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
@@ -299,6 +301,20 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
}
/** {@inheritDoc} */
+ @Override public Node<Row> visit(IgniteLimit rel) {
+ Supplier<Integer> offset = (rel.offset() == null) ? null :
expressionFactory.execute(rel.offset());
+ Supplier<Integer> fetch = (rel.fetch() == null) ? null :
expressionFactory.execute(rel.fetch());
+
+ LimitNode<Row> node = new LimitNode<>(ctx, rel.getRowType(), offset,
fetch);
+
+ Node<Row> input = visit(rel.getInput());
+
+ node.register(input);
+
+ return node;
+ }
+
+ /** {@inheritDoc} */
@Override public Node<Row> visit(IgniteSort rel) {
RelCollation collation = rel.getCollation();
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java
index e731601..2672e5b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java
@@ -81,6 +81,12 @@ public interface ExpressionFactory<Row> {
*/
Supplier<Row> rowSource(List<RexNode> values);
+
+ /**
+ * Executes expression.
+ */
+ <T> Supplier<T> execute(RexNode node);
+
/**
* Creates {@link Scalar}, a code-generated expressions evaluator.
*
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
index 27670d2..0c79666 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
@@ -157,6 +157,11 @@ public class ExpressionFactoryImpl<Row> implements
ExpressionFactory<Row> {
}
/** {@inheritDoc} */
+ @Override public <T> Supplier<T> execute(RexNode node) {
+ return new ValueImpl<T>(scalar(node, null),
ctx.rowHandler().factory(typeFactory.getJavaClass(node.getType())));
+ }
+
+ /** {@inheritDoc} */
@Override public Iterable<Row> values(List<RexLiteral> values, RelDataType
rowType) {
RowHandler<Row> handler = ctx.rowHandler();
RowFactory<Row> factory = handler.factory(typeFactory, rowType);
@@ -337,6 +342,29 @@ public class ExpressionFactoryImpl<Row> implements
ExpressionFactory<Row> {
}
/** */
+ private class ValueImpl<T> implements Supplier<T> {
+ /** */
+ private final Scalar scalar;
+
+ /** */
+ private final RowFactory<Row> factory;
+
+ /** */
+ private ValueImpl(Scalar scalar, RowFactory<Row> factory) {
+ this.scalar = scalar;
+ this.factory = factory;
+ }
+
+ /** {@inheritDoc} */
+ @Override public T get() {
+ Row res = factory.create();
+ scalar.execute(ctx, null, res);
+
+ return (T)ctx.rowHandler().get(0, res);
+ }
+ }
+
+ /** */
private class FieldGetter implements InputGetter {
/** */
private final Expression hnd_;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
index ab410a6..41d2181 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
@@ -51,7 +51,7 @@ public abstract class AbstractNode<Row> implements Node<Row> {
/**
* {@link Inbox} node may not have proper context at creation time in case
it
* creates on first message received from a remote source. This case the
context
- * sets in scope of {@link Inbox#init(ExecutionContext, Collection,
Comparator)} method call.
+ * sets in scope of {@link Inbox#init(ExecutionContext, RelDataType,
Collection, Comparator)} method call.
*/
private ExecutionContext<Row> ctx;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitNode.java
new file mode 100644
index 0000000..842a44b
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitNode.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.function.Supplier;
+import org.apache.calcite.rel.type.RelDataType;
+import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+/** Offset, fetch|limit support node. */
+public class LimitNode<Row> extends AbstractNode<Row> implements
SingleNode<Row>, Downstream<Row> {
+ /** Offset if its present, otherwise 0. */
+ private final int offset;
+
+ /** Fetch if its present, otherwise 0. */
+ private final int fetch;
+
+ /** Already processed (pushed to upstream) rows count. */
+ private int rowsProcessed;
+
+ /** Fetch can be unset, in this case we need all rows. */
+ private @Nullable Supplier<Integer> fetchNode;
+
+ /** Waiting results counter. */
+ private int waiting;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Execution context.
+ * @param rowType Row type.
+ */
+ public LimitNode(
+ ExecutionContext<Row> ctx,
+ RelDataType rowType,
+ Supplier<Integer> offsetNode,
+ Supplier<Integer> fetchNode
+ ) {
+ super(ctx, rowType);
+
+ offset = offsetNode == null ? 0 : offsetNode.get();
+ fetch = fetchNode == null ? 0 : fetchNode.get();
+ this.fetchNode = fetchNode;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void request(int rowsCnt) {
+ assert !F.isEmpty(sources()) && sources().size() == 1;
+ assert rowsCnt > 0;
+
+ if (fetchNone()) {
+ end();
+
+ return;
+ }
+
+ if (offset > 0 && rowsProcessed == 0)
+ rowsCnt = offset + rowsCnt;
+
+ try {
+ checkState();
+
+ source().request(waiting = rowsCnt);
+ }
+ catch (Exception e) {
+ onError(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void push(Row row) {
+ if (waiting == -1)
+ return;
+
+ ++rowsProcessed;
+
+ --waiting;
+
+ try {
+ checkState();
+ }
+ catch (Throwable e) {
+ onError(e);
+ }
+
+ if (rowsProcessed > offset) {
+ if (fetchNode == null || (fetchNode != null && rowsProcessed <=
fetch + offset))
+ downstream().push(row);
+ }
+
+ if (fetch > 0 && rowsProcessed == fetch + offset && waiting > 0)
+ end();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void end() {
+ try {
+ if (waiting == -1)
+ return;
+
+ assert downstream() != null;
+
+ waiting = -1;
+
+ downstream().end();
+ }
+ catch (Exception e) {
+ onError(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void rewindInternal() {
+ rowsProcessed = 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Downstream<Row> requestDownstream(int idx) {
+ if (idx != 0)
+ throw new IndexOutOfBoundsException();
+
+ return this;
+ }
+
+ /** {@code True} if requested 0 results, or all already processed. */
+ private boolean fetchNone() {
+ return (fetchNode != null && fetch == 0) || (fetch > 0 &&
rowsProcessed == fetch + offset);
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
index 028baa6..b9833af 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
@@ -73,7 +73,6 @@ public interface Node<Row> extends AutoCloseable {
*/
void request(int rowsCnt);
-
/**
* Rewinds upstream.
*/
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
index 2006666..d83267b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
@@ -52,7 +52,7 @@ public class ScanNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>
/** {@inheritDoc} */
@Override public void request(int rowsCnt) {
- assert rowsCnt > 0 && requested == 0;
+ assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ",
requested=" + requested;
try {
checkState();
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
index 782b012..cb30293 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
@@ -155,8 +155,10 @@ public class SortNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
}
}
- if (requested >= 0) {
- downstream().end();
+ if (rows.isEmpty()) {
+ if (requested > 0)
+ downstream().end();
+
requested = 0;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
index 13c2b60..bbda712 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -24,6 +24,7 @@ import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedN
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
@@ -150,6 +151,11 @@ class Cloner implements IgniteRelVisitor<IgniteRel> {
}
/** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteLimit rel) {
+ return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteRel visit(IgniteReceiver rel) {
return collect((IgniteReceiver)rel.clone(cluster, F.asList()));
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IdGenerator.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IdGenerator.java
index f9f5318..9808c66 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IdGenerator.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IdGenerator.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.ignite.internal.processors.query.calcite.prepare;
import java.util.concurrent.atomic.AtomicLong;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
index bf0709d..bd584b9 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
@@ -233,7 +233,7 @@ public class IgnitePlanner implements Planner,
RelOptTable.ViewExpander {
}
CalciteCatalogReader catalogReader =
this.catalogReader.withSchemaPath(schemaPath);
- SqlValidator validator = new IgniteSqlValidator(operatorTbl,
catalogReader, typeFactory, validatorCfg);
+ SqlValidator validator = new IgniteSqlValidator(operatorTbl,
catalogReader, typeFactory, validatorCfg, ctx.parameters());
SqlToRelConverter sqlToRelConverter = sqlToRelConverter(validator,
catalogReader, sqlToRelConverterCfg);
RelRoot root = sqlToRelConverter.convertQuery(sqlNode, true, false);
root = root.withRel(sqlToRelConverter.decorrelate(sqlNode, root.rel));
@@ -289,7 +289,7 @@ public class IgnitePlanner implements Planner,
RelOptTable.ViewExpander {
/** */
private SqlValidator validator() {
if (validator == null)
- validator = new IgniteSqlValidator(operatorTbl, catalogReader,
typeFactory, validatorCfg);
+ validator = new IgniteSqlValidator(operatorTbl, catalogReader,
typeFactory, validatorCfg, ctx.parameters());
return validator;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
index 3b3e873..2e5dfc7 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
@@ -23,6 +23,7 @@ import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedN
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
@@ -108,6 +109,11 @@ public class IgniteRelShuttle implements
IgniteRelVisitor<IgniteRel> {
}
/** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteLimit rel) {
+ return processNode(rel);
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteRel visit(IgniteIndexScan rel) {
return processNode(rel);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java
index d34a74a..b593eb9 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
+import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -29,9 +30,11 @@ import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlDelete;
+import org.apache.calcite.sql.SqlDynamicParam;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperatorTable;
@@ -50,11 +53,18 @@ import
org.apache.ignite.internal.processors.query.QueryUtils;
import
org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
import
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.IgniteResource;
+import org.apache.ignite.internal.util.typedef.F;
import static org.apache.calcite.util.Static.RESOURCE;
/** Validator. */
public class IgniteSqlValidator extends SqlValidatorImpl {
+ /** Decimal of Integer.MAX_VALUE for fetch/offset bounding. */
+ private static final BigDecimal DEC_INT_MAX =
BigDecimal.valueOf(Integer.MAX_VALUE);
+
+ /** Dynamic parameters. */
+ Object[] parameters;
+
/**
* Creates a validator.
*
@@ -62,10 +72,13 @@ public class IgniteSqlValidator extends SqlValidatorImpl {
* @param catalogReader Catalog reader
* @param typeFactory Type factory
* @param config Config
+ * @param parameters Dynamic parameters
*/
public IgniteSqlValidator(SqlOperatorTable opTab, CalciteCatalogReader
catalogReader,
- IgniteTypeFactory typeFactory, SqlValidator.Config config) {
+ IgniteTypeFactory typeFactory, SqlValidator.Config config, Object[]
parameters) {
super(opTab, catalogReader, typeFactory, config);
+
+ this.parameters = parameters;
}
/** {@inheritDoc} */
@@ -126,6 +139,43 @@ public class IgniteSqlValidator extends SqlValidatorImpl {
}
/** {@inheritDoc} */
+ @Override protected void validateSelect(SqlSelect select, RelDataType
targetRowType) {
+ checkIntegerLimit(select.getFetch(), "fetch / limit");
+ checkIntegerLimit(select.getOffset(), "offset");
+
+ super.validateSelect(select, targetRowType);
+ }
+
+
+ /**
+ * @param n Node to check limit.
+ * @param nodeName Node name.
+ */
+ private void checkIntegerLimit(SqlNode n, String nodeName) {
+ if (n instanceof SqlLiteral) {
+ BigDecimal offFetchLimit = ((SqlLiteral)n).bigDecimalValue();
+
+ if (offFetchLimit.compareTo(DEC_INT_MAX) > 0 ||
offFetchLimit.compareTo(BigDecimal.ZERO) < 0)
+ throw newValidationError(n,
IgniteResource.INSTANCE.correctIntegerLimit(nodeName));
+ }
+ else if (n instanceof SqlDynamicParam) {
+ // will fail in params check.
+ if (F.isEmpty(parameters))
+ return;
+
+ int idx = ((SqlDynamicParam) n).getIndex();
+
+ if (idx < parameters.length) {
+ Object param = parameters[idx];
+ if (parameters[idx] instanceof Integer) {
+ if ((Integer)param < 0)
+ throw newValidationError(n,
IgniteResource.INSTANCE.correctIntegerLimit(nodeName));
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void validateCall(SqlCall call, SqlValidatorScope scope) {
if (call.getKind() == SqlKind.AS) {
final String alias = deriveAlias(call, 0);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index fde1c7c..6690bbd 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rel.rules.JoinPushThroughJoinRule;
import org.apache.calcite.rel.rules.ProjectFilterTransposeRule;
import org.apache.calcite.rel.rules.ProjectMergeRule;
import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rel.rules.PruneEmptyRules;
import org.apache.calcite.rel.rules.SortRemoveRule;
import org.apache.calcite.tools.Program;
import org.apache.calcite.tools.RuleSet;
@@ -137,8 +138,14 @@ public enum PlannerPhase {
CoreRules.AGGREGATE_REMOVE,
CoreRules.AGGREGATE_REDUCE_FUNCTIONS,
- ExposeIndexRule.INSTANCE,
+ PruneEmptyRules.SortFetchZeroRuleConfig.EMPTY
+ .withOperandSupplier(b ->
+ b.operand(LogicalSort.class).anyInputs())
+ .withDescription("PruneSortLimit0")
+ .as(PruneEmptyRules.SortFetchZeroRuleConfig.class)
+ .toRule(),
+ ExposeIndexRule.INSTANCE,
ProjectScanMergeRule.TABLE_SCAN,
ProjectScanMergeRule.INDEX_SCAN,
FilterScanMergeRule.TABLE_SCAN,
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java
new file mode 100644
index 0000000..2e25e23
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.Pair;
+
+/** */
+public class IgniteLimit extends SingleRel implements IgniteRel {
+ /** Offset. */
+ private final RexNode offset;
+
+ /** Fetches rows expression (limit) */
+ private final RexNode fetch;
+
+ /**
+ * Constructor.
+ *
+ * @param cluster Cluster.
+ * @param traits Trait set.
+ * @param child Input relational expression.
+ * @param offset Offset.
+ * @param fetch Limit.
+ */
+ public IgniteLimit(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ RelNode child,
+ RexNode offset,
+ RexNode fetch
+ ) {
+ super(cluster, traits, child);
+ this.offset = offset;
+ this.fetch = fetch;
+ }
+
+ /** {@inheritDoc} */
+ @Override public final IgniteLimit copy(RelTraitSet traitSet,
List<RelNode> inputs) {
+ return new IgniteLimit(getCluster(), traitSet, sole(inputs), offset,
fetch);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelWriter explainTerms(RelWriter pw) {
+ super.explainTerms(pw);
+ pw.itemIf("offset", offset, offset != null);
+ pw.itemIf("fetch", fetch, fetch != null);
+ return pw;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Pair<RelTraitSet, List<RelTraitSet>>
passThroughTraits(RelTraitSet required) {
+ if (required.getConvention() != IgniteConvention.INSTANCE)
+ return null;
+
+ return Pair.of(required, ImmutableList.of(required));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Pair<RelTraitSet, List<RelTraitSet>>
deriveTraits(RelTraitSet childTraits, int childId) {
+ assert childId == 0;
+
+ if (childTraits.getConvention() != IgniteConvention.INSTANCE)
+ return null;
+
+ return Pair.of(childTraits, ImmutableList.of(childTraits));
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
RelMetadataQuery mq) {
+ double rows = estimateRowCount(mq);
+
+ return planner.getCostFactory().makeCost(rows, 0, 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public double estimateRowCount(RelMetadataQuery mq) {
+ Integer lim = intFromRex(fetch);
+ Integer off = intFromRex(offset);
+
+ if (lim == null) {
+ // If estimated rowcount is less than offset return 0
+ if (off == null)
+ return getInput().estimateRowCount(mq) * 0.01;
+
+ return Math.max(0, getInput().estimateRowCount(mq) - off);
+ }
+ else {
+ // probably we can process DYNAMIC_PARAM here too.
+ if (off != null)
+ return lim + off;
+ else
+ return lim / 2.;
+ }
+ }
+
+ /**
+ * @return Integer value of the literal expression.
+ */
+ private Integer intFromRex(RexNode n) {
+ try {
+ if (n.isA(SqlKind.LITERAL))
+ return ((RexLiteral)n).getValueAs(Integer.class);
+ else
+ return null;
+ }
+ catch (Exception e) {
+ return null;
+ }
+ }
+
+ /**
+ * @return Offset.
+ */
+ public RexNode offset() {
+ return offset;
+ }
+
+ /**
+ * @return Fetches rows expression (limit)
+ */
+ public RexNode fetch() {
+ return fetch;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel>
inputs) {
+ return new IgniteLimit(cluster, getTraitSet(), sole(inputs), offset,
fetch);
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
index e43b43c..223d3cf 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
@@ -112,6 +112,11 @@ public interface IgniteRelVisitor<T> {
T visit(IgniteTableSpool rel);
/**
+ * See {@link IgniteRelVisitor#visit(IgniteRel)}
+ */
+ T visit(IgniteLimit rel);
+
+ /**
* Visits a relational node and calculates a result on the basis of node
meta information.
* @param rel Relational node.
* @return Visit result.
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java
index 29d6a2d..838574d 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java
@@ -42,19 +42,16 @@ public class IgniteSort extends Sort implements IgniteRel {
* @param traits Trait set.
* @param child Input node.
* @param collation Collation.
- * @param offset Offset.
- * @param fetch Limit.
*/
public IgniteSort(
RelOptCluster cluster,
RelTraitSet traits,
RelNode child,
- RelCollation collation,
- RexNode offset,
- RexNode fetch) {
- super(cluster, traits, child, collation, offset, fetch);
+ RelCollation collation) {
+ super(cluster, traits, child, collation);
}
+ /** */
public IgniteSort(RelInput input) {
super(changeTraits(input, IgniteConvention.INSTANCE));
}
@@ -65,8 +62,11 @@ public class IgniteSort extends Sort implements IgniteRel {
RelNode newInput,
RelCollation newCollation,
RexNode offset,
- RexNode fetch) {
- return new IgniteSort(getCluster(), traitSet, newInput,newCollation,
offset, fetch);
+ RexNode fetch
+ ) {
+ assert offset == null && fetch == null;
+
+ return new IgniteSort(getCluster(), traitSet, newInput, newCollation);
}
/** {@inheritDoc} */
@@ -101,6 +101,6 @@ public class IgniteSort extends Sort implements IgniteRel {
/** {@inheritDoc} */
@Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel>
inputs) {
- return new IgniteSort(cluster, getTraitSet(), sole(inputs), collation,
offset, fetch);
+ return new IgniteSort(cluster, getTraitSet(), sole(inputs), collation);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
index cc159b6..4bd3a17 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
@@ -58,7 +58,7 @@ public class IgniteTrimExchange extends Exchange implements
SourceAwareIgniteRel
this.sourceId = sourceId;
}
-
+ /** */
public IgniteTrimExchange(RelInput input) {
super(changeTraits(input, IgniteConvention.INSTANCE));
sourceId = ((Number)input.get("sourceId")).longValue();
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalIndexScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalIndexScan.java
index d510188..08c1609 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalIndexScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalIndexScan.java
@@ -39,7 +39,6 @@ import org.jetbrains.annotations.Nullable;
/** */
public class IgniteLogicalIndexScan extends AbstractIndexScan {
-
/** Creates a IgniteLogicalIndexScan. */
public static IgniteLogicalIndexScan create(
RelOptCluster cluster,
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortConverterRule.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortConverterRule.java
index 822ea54..242d516 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortConverterRule.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortConverterRule.java
@@ -17,35 +17,61 @@
package org.apache.ignite.internal.processors.query.calcite.rule;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.PhysicalNode;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.logical.LogicalSort;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
/**
* Converter rule for sort operator.
*/
-public class SortConverterRule extends
AbstractIgniteConverterRule<LogicalSort> {
+public class SortConverterRule extends RelRule<SortConverterRule.Config> {
/** */
- public static final RelOptRule INSTANCE = new SortConverterRule();
+ public static final RelOptRule INSTANCE =
+ SortConverterRule.Config.DEFAULT
+ .as(SortConverterRule.Config.class).toRule();
- /** */
- public SortConverterRule() {
- super(LogicalSort.class, "SortConverterRule");
+ /** Creates a LimitConverterRule. */
+ protected SortConverterRule(SortConverterRule.Config config) {
+ super(config);
+ }
+
+ /** Rule configuration. */
+ public interface Config extends RelRule.Config {
+ SortConverterRule.Config DEFAULT = EMPTY
+ .withOperandSupplier(b ->
+ b.operand(LogicalSort.class).anyInputs())
+ .as(SortConverterRule.Config.class);
+
+ /** {@inheritDoc} */
+ @Override default SortConverterRule toRule() {
+ return new SortConverterRule(this);
+ }
}
/** {@inheritDoc} */
- @Override protected PhysicalNode convert(RelOptPlanner planner,
RelMetadataQuery mq, LogicalSort rel) {
- RelOptCluster cluster = rel.getCluster();
- RelTraitSet outTraits =
cluster.traitSetOf(IgniteConvention.INSTANCE).replace(rel.getCollation());
+ @Override public void onMatch(RelOptRuleCall call) {
+ final Sort sort = call.rel(0);
+ RelOptCluster cluster = sort.getCluster();
+ RelTraitSet outTraits =
cluster.traitSetOf(IgniteConvention.INSTANCE).replace(sort.getCollation());
RelTraitSet inTraits = cluster.traitSetOf(IgniteConvention.INSTANCE);
- RelNode input = convert(rel.getInput(), inTraits);
+ RelNode input = convert(sort.getInput(), inTraits);
+
+ if (sort.fetch != null || sort.offset != null) {
+ RelTraitSet traits =
cluster.traitSetOf(IgniteConvention.INSTANCE).replace(sort.getCollation());
+
+ call.transformTo(new IgniteLimit(cluster, traits,
convert(sort.getInput(), traits), sort.offset,
+ sort.fetch));
+
+ return;
+ }
- return new IgniteSort(cluster, outTraits, input, rel.getCollation(),
rel.offset, rel.fetch);
+ call.transformTo(new IgniteSort(cluster, outTraits, input,
sort.getCollation()));
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
index 0589516..48601fd 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
@@ -123,7 +123,7 @@ public class TraitUtils {
RelTraitSet traits = rel.getTraitSet().replace(toTrait);
- return new IgniteSort(rel.getCluster(), traits, rel, toTrait, null,
null);
+ return new IgniteSort(rel.getCluster(), traits, rel, toTrait);
}
/** */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteResource.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteResource.java
index 8a5f02b..a0dfd8e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteResource.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteResource.java
@@ -38,4 +38,9 @@ public interface IgniteResource {
/** */
@Resources.BaseMessage("Illegal aggregate function. {0} is unsupported at
the moment.")
Resources.ExInst<SqlValidatorException>
unsupportedAggregationFunction(String a0);
+
+ /** */
+ @Resources.BaseMessage("Illegal value of {0}. The value must be positive
and less than Integer.MAX_VALUE " +
+ "(" + Integer.MAX_VALUE + ")." )
+ Resources.ExInst<SqlValidatorException> correctIntegerLimit(String a0);
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/LimitOffsetTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/LimitOffsetTest.java
new file mode 100644
index 0000000..e722f31
--- /dev/null
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/LimitOffsetTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryEngine;
+import
org.apache.ignite.internal.processors.query.calcite.exec.rel.AbstractNode;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * Limit / offset tests.
+ */
+public class LimitOffsetTest extends GridCommonAbstractTest {
+ /** */
+ private static IgniteCache<Integer, String> cacheRepl;
+
+ /** */
+ private static IgniteCache<Integer, String> cachePart;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGrids(2);
+
+ cacheRepl = grid(0).cache("TEST_REPL");
+ cachePart = grid(0).cache("TEST_PART");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ QueryEntity eRepl = new QueryEntity()
+ .setTableName("TEST_REPL")
+ .setKeyType(Integer.class.getName())
+ .setValueType(String.class.getName())
+ .setKeyFieldName("id")
+ .setValueFieldName("val")
+ .addQueryField("id", Integer.class.getName(), null)
+ .addQueryField("val", String.class.getName(), null);
+
+ QueryEntity ePart = new QueryEntity()
+ .setTableName("TEST_PART")
+ .setKeyType(Integer.class.getName())
+ .setValueType(String.class.getName())
+ .setKeyFieldName("id")
+ .setValueFieldName("val")
+ .addQueryField("id", Integer.class.getName(), null)
+ .addQueryField("val", String.class.getName(), null);;
+
+ return super.getConfiguration(igniteInstanceName)
+ .setCacheConfiguration(
+ new CacheConfiguration<>(eRepl.getTableName())
+ .setCacheMode(CacheMode.REPLICATED)
+ .setQueryEntities(singletonList(eRepl))
+ .setSqlSchema("PUBLIC"),
+ new CacheConfiguration<>(ePart.getTableName())
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setQueryEntities(singletonList(ePart))
+ .setSqlSchema("PUBLIC"));
+ }
+
+ /** Tests correctness of fetch / offset params. */
+ @Test
+ public void testInvalidLimitOffset() {
+ QueryEngine engine = Commons.lookupComponent(grid(0).context(),
QueryEngine.class);
+
+ String bigInt = BigDecimal.valueOf(10000000000L).toString();
+
+ try {
+ List<FieldsQueryCursor<List<?>>> cursors =
+ engine.query(null, "PUBLIC",
+ "SELECT * FROM TEST_REPL OFFSET " + bigInt + " ROWS");
+ cursors.get(0).getAll();
+
+ fail();
+ }
+ catch (Throwable e) {
+ assertTrue(e.toString(), X.hasCause(e, "Illegal value of offset",
SqlValidatorException.class));
+ }
+
+ try {
+ List<FieldsQueryCursor<List<?>>> cursors =
+ engine.query(null, "PUBLIC",
+ "SELECT * FROM TEST_REPL FETCH FIRST " + bigInt + " ROWS
ONLY");
+ cursors.get(0).getAll();
+
+ fail();
+ }
+ catch (Throwable e) {
+ assertTrue(e.toString(), X.hasCause(e, "Illegal value of fetch /
limit", SqlValidatorException.class));
+ }
+
+ try {
+ List<FieldsQueryCursor<List<?>>> cursors =
+ engine.query(null, "PUBLIC", "SELECT * FROM TEST_REPL LIMIT "
+ bigInt);
+ cursors.get(0).getAll();
+
+ fail();
+ }
+ catch (Throwable e) {
+ assertTrue(e.toString(), X.hasCause(e, "Illegal value of fetch /
limit", SqlValidatorException.class));
+ }
+
+ try {
+ List<FieldsQueryCursor<List<?>>> cursors =
+ engine.query(null, "PUBLIC",
+ "SELECT * FROM TEST_REPL OFFSET -1 ROWS FETCH FIRST -1
ROWS ONLY");
+ cursors.get(0).getAll();
+
+ fail();
+ }
+ catch (Throwable e) {
+ assertTrue(e.toString(), X.hasCause(e, IgniteSQLException.class));
+ }
+
+ try {
+ List<FieldsQueryCursor<List<?>>> cursors =
+ engine.query(null, "PUBLIC",
+ "SELECT * FROM TEST_REPL OFFSET -1 ROWS");
+ cursors.get(0).getAll();
+
+ fail();
+ }
+ catch (Throwable e) {
+ assertTrue(e.toString(), X.hasCause(e, IgniteSQLException.class));
+ }
+
+ try {
+ List<FieldsQueryCursor<List<?>>> cursors =
+ engine.query(null, "PUBLIC",
+ "SELECT * FROM TEST_REPL OFFSET 2+1 ROWS");
+ cursors.get(0).getAll();
+
+ fail();
+ }
+ catch (Throwable e) {
+ assertTrue(e.toString(), X.hasCause(e, IgniteSQLException.class));
+ }
+
+ // Check with parameters
+ try {
+ List<FieldsQueryCursor<List<?>>> cursors =
+ engine.query(null, "PUBLIC",
+ "SELECT * FROM TEST_REPL OFFSET ? ROWS FETCH FIRST ? ROWS
ONLY", -1, -1);
+ cursors.get(0).getAll();
+
+ fail();
+ }
+ catch (Throwable e) {
+ assertTrue(e.toString(), X.hasCause(e, "Illegal value of fetch /
limit", SqlValidatorException.class));
+ }
+
+ try {
+ List<FieldsQueryCursor<List<?>>> cursors =
+ engine.query(null, "PUBLIC",
+ "SELECT * FROM TEST_REPL OFFSET ? ROWS", -1);
+ cursors.get(0).getAll();
+
+ fail();
+ }
+ catch (Throwable e) {
+ assertTrue(e.toString(), X.hasCause(e, "Illegal value of offset",
SqlValidatorException.class));
+ }
+
+ try {
+ List<FieldsQueryCursor<List<?>>> cursors =
+ engine.query(null, "PUBLIC",
+ "SELECT * FROM TEST_REPL FETCH FIRST ? ROWS ONLY", -1);
+ cursors.get(0).getAll();
+
+ fail();
+ }
+ catch (Throwable e) {
+ assertTrue(e.toString(), X.hasCause(e, "Illegal value of fetch /
limit", SqlValidatorException.class));
+ }
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testLimitOffset() throws Exception {
+ int inBufSize = U.field(AbstractNode.class, "IN_BUFFER_SIZE");
+
+ int[] rowsArr = {10, inBufSize, (2 * inBufSize) - 1};
+
+ for (int rows : rowsArr) {
+ fillCache(cacheRepl, rows);
+
+ int[] limits = {-1, 0, 10, rows / 2 - 1, rows / 2, rows / 2 + 1,
rows - 1, rows};
+ int[] offsets = {-1, 0, 10, rows / 2 - 1, rows / 2, rows / 2 + 1,
rows - 1, rows};
+
+ for (int lim : limits) {
+ for (int off : offsets) {
+ log.info("+++ Check [rows=" + rows + ", limit=" + lim + ",
off=" + off + ']');
+
+ checkQuery(rows, lim, off, false, false);
+ checkQuery(rows, lim, off, true, false);
+ checkQuery(rows, lim, off, false, true);
+ checkQuery(rows, lim, off, true, true);
+ }
+ }
+ }
+ }
+
+ /**
+ * @param c Cache.
+ * @param rows Rows count.
+ */
+ private void fillCache(IgniteCache c, int rows) throws
InterruptedException {
+ c.clear();
+
+ for (int i = 0; i < rows; ++i)
+ c.put(i, "val_" + i);
+
+ awaitPartitionMapExchange();
+ }
+
+ /**
+ * Check query with specified limit and offset (or without its when the
arguments are negative),
+ *
+ * @param rows Rows count.
+ * @param lim Limit.
+ * @param off Offset.
+ * @param param If {@code false} place limit/offset as literals. Otherwise
they are plase as parameters.
+ * @param sorted Use sorted query (adds ORDER BY).
+ */
+ void checkQuery(int rows, int lim, int off, boolean param, boolean sorted)
{
+ QueryEngine engine = Commons.lookupComponent(grid(0).context(),
QueryEngine.class);
+
+ String sql = createSql(lim, off, param, sorted);
+
+ Object[] params;
+ if (lim >= 0 && off >= 0)
+ params = new Object[]{off, lim};
+ else if (lim >= 0)
+ params = new Object[]{lim};
+ else if (off >= 0)
+ params = new Object[]{off};
+ else
+ params = X.EMPTY_OBJECT_ARRAY;
+
+ log.info("SQL: " + sql + (param ? "params=" + Arrays.toString(params)
: ""));
+
+ List<FieldsQueryCursor<List<?>>> cursors =
+ engine.query(null, "PUBLIC", sql, param ? params :
X.EMPTY_OBJECT_ARRAY);
+
+ List<List<?>> res = cursors.get(0).getAll();
+
+ assertEquals("Invalid results size. [rows=" + rows + ", limit=" + lim
+ ", off=" + off
+ + ", res=" + res.size() + ']', expectedSize(rows, lim, off),
res.size());
+ }
+
+ /**
+ * Calculates expected result set size by limit and offset.
+ */
+ private int expectedSize(int rows, int lim, int off) {
+ if (off < 0)
+ off = 0;
+
+ if (lim == 0)
+ return 0;
+ else if (lim < 0)
+ return rows - off;
+ else if (lim + off < rows)
+ return lim;
+ else if (off > rows)
+ return 0;
+ else
+ return rows - off;
+ }
+
+ /**
+ * @param lim Limit.
+ * @param off Offset.
+ * @param param Flag to place limit/offset by parameter or literal.
+ * @return SQL query string.
+ */
+ private String createSql(int lim, int off, boolean param, boolean sorted) {
+ StringBuilder sb = new StringBuilder("SELECT * FROM TEST_REPL ");
+
+ if (sorted)
+ sb.append("ORDER BY ID ");
+
+ if (off >= 0)
+ sb.append("OFFSET ").append(param ? "?" :
Integer.toString(off)).append(" ROWS ");
+
+ if (lim >= 0)
+ sb.append("FETCH FIRST ").append(param ? "?" :
Integer.toString(lim)).append(" ROWS ONLY ");
+
+ return sb.toString();
+ }
+}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
index d36600f..abfc643 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -87,7 +88,9 @@ import
org.apache.ignite.internal.processors.query.calcite.prepare.QueryTemplate
import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
@@ -115,7 +118,6 @@ import org.jetbrains.annotations.Nullable;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import static org.apache.calcite.tools.Frameworks.createRootSchema;
@@ -2690,9 +2692,6 @@ public class PlannerTest extends GridCommonAbstractTest {
publicSchema.addTable("T0", t0);
publicSchema.addTable("T1", t1);
- SchemaPlus schema = createRootSchema(false)
- .add("PUBLIC", publicSchema);
-
String sql = "select * " +
"from t0 " +
"join t1 on t0.jid = t1.jid";
@@ -2755,9 +2754,6 @@ public class PlannerTest extends GridCommonAbstractTest {
publicSchema.addTable("T0", t0);
publicSchema.addTable("T1", t1);
- SchemaPlus schema = createRootSchema(false)
- .add("PUBLIC", publicSchema);
-
String sql = "select * " +
"from t0 " +
"join t1 on t0.jid = t1.jid";
@@ -2782,6 +2778,104 @@ public class PlannerTest extends GridCommonAbstractTest
{
assertEquals(1, spoolCnt.get());
}
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testLimit() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable testTbl = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("VAL", f.createJavaType(String.class))
+ .build()) {
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ };
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("TEST", testTbl);
+
+ String sql = "SELECT * FROM TEST OFFSET 10 ROWS FETCH FIRST 10 ROWS
ONLY";
+
+ {
+ RelNode phys = physicalPlan(sql, publicSchema);
+
+ assertNotNull(phys);
+
+ AtomicInteger limit = new AtomicInteger();
+ AtomicBoolean sort = new AtomicBoolean();
+
+ relTreeVisit(phys, (node, ordinal, parent) -> {
+ if (node instanceof IgniteLimit)
+ limit.incrementAndGet();
+
+ if (node instanceof IgniteSort)
+ sort.set(true);
+ }
+ );
+
+ assertEquals("Invalid plan: \n" + RelOptUtil.toString(phys), 1,
limit.get());
+ assertFalse("Invalid plan: \n" + RelOptUtil.toString(phys),
sort.get());
+ }
+
+ sql = "SELECT * FROM TEST ORDER BY ID OFFSET 10 ROWS FETCH FIRST 10
ROWS ONLY";
+
+ {
+ RelNode phys = physicalPlan(sql, publicSchema);
+
+ assertNotNull(phys);
+
+ AtomicInteger limit = new AtomicInteger();
+ AtomicBoolean sort = new AtomicBoolean();
+
+ relTreeVisit(phys, (node, ordinal, parent) -> {
+ if (node instanceof IgniteLimit)
+ limit.incrementAndGet();
+
+ if (node instanceof IgniteSort)
+ sort.set(true);
+ }
+ );
+
+ assertEquals("Invalid plan: \n" + RelOptUtil.toString(phys), 1,
limit.get());
+ assertFalse("Invalid plan: \n" + RelOptUtil.toString(phys),
sort.get());
+ }
+ }
+
+ /** */
+ interface TestVisitor {
+ public void visit(RelNode node, int ordinal, RelNode parent);
+ }
+
+ /** */
+ private static class TestRelVisitor extends RelVisitor {
+ /** */
+ final TestVisitor v;
+
+ /** */
+ TestRelVisitor(TestVisitor v) {
+ this.v = v;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void visit(RelNode node, int ordinal, RelNode parent)
{
+ v.visit(node, ordinal, parent);
+
+ super.visit(node, ordinal, parent);
+ }
+ }
+
+ /** */
+ protected static void relTreeVisit(RelNode n, TestVisitor v) {
+ v.visit(n, -1, null);
+
+ n.childrenAccept(new TestRelVisitor(v));
+ }
+
/** */
private IgniteRel physicalPlan(String sql, IgniteSchema publicSchema,
String... disabledRules) throws Exception {
SchemaPlus schema = createRootSchema(false)
@@ -2894,7 +2988,6 @@ public class PlannerTest extends GridCommonAbstractTest {
this(type, RewindabilityTrait.REWINDABLE);
}
-
/** */
private TestTable(RelDataType type, RewindabilityTrait rewindable) {
this(type, rewindable, 100.0);
@@ -2996,8 +3089,12 @@ public class PlannerTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public boolean rolledUpColumnValidInsideAgg(String column,
SqlCall call, SqlNode parent,
- CalciteConnectionConfig config) {
+ @Override public boolean rolledUpColumnValidInsideAgg(
+ String column,
+ SqlCall call,
+ SqlNode parent,
+ CalciteConnectionConfig config
+ ) {
throw new AssertionError();
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
index d5da323..8e2fb07 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
@@ -21,6 +21,7 @@ import
org.apache.ignite.internal.processors.query.calcite.CalciteBasicSecondary
import
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest;
import org.apache.ignite.internal.processors.query.calcite.CancelTest;
import org.apache.ignite.internal.processors.query.calcite.DateTimeTest;
+import org.apache.ignite.internal.processors.query.calcite.LimitOffsetTest;
import org.apache.ignite.internal.processors.query.calcite.PlannerTest;
import org.apache.ignite.internal.processors.query.calcite.QueryCheckerTest;
import org.apache.ignite.internal.processors.query.calcite.TableSpoolTest;
@@ -51,6 +52,7 @@ import org.junit.runners.Suite;
QueryCheckerTest.class,
DateTimeTest.class,
TableSpoolTest.class,
+ LimitOffsetTest.class
})
public class IgniteCalciteTestSuite {
}