This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new 88807bc  IGNITE-13025 Calcite integration. Limit and offset support 
(closes #7903, #8493)
88807bc is described below

commit 88807bc4d25c4959233e5504b46182c21cfa5234
Author: Stanilovsky Evgeny <[email protected]>
AuthorDate: Wed Nov 25 12:41:38 2020 +0300

    IGNITE-13025 Calcite integration. Limit and offset support (closes #7903, 
#8493)
---
 .../query/calcite/exec/LogicalRelImplementor.java  |  16 +
 .../query/calcite/exec/exp/ExpressionFactory.java  |   6 +
 .../calcite/exec/exp/ExpressionFactoryImpl.java    |  28 ++
 .../query/calcite/exec/rel/AbstractNode.java       |   2 +-
 .../query/calcite/exec/rel/LimitNode.java          | 145 ++++++++++
 .../processors/query/calcite/exec/rel/Node.java    |   1 -
 .../query/calcite/exec/rel/ScanNode.java           |   2 +-
 .../query/calcite/exec/rel/SortNode.java           |   6 +-
 .../processors/query/calcite/prepare/Cloner.java   |   6 +
 .../query/calcite/prepare/IdGenerator.java         |  17 ++
 .../query/calcite/prepare/IgnitePlanner.java       |   4 +-
 .../query/calcite/prepare/IgniteRelShuttle.java    |   6 +
 .../query/calcite/prepare/IgniteSqlValidator.java  |  52 +++-
 .../query/calcite/prepare/PlannerPhase.java        |   9 +-
 .../processors/query/calcite/rel/IgniteLimit.java  | 161 +++++++++++
 .../query/calcite/rel/IgniteRelVisitor.java        |   5 +
 .../processors/query/calcite/rel/IgniteSort.java   |  18 +-
 .../query/calcite/rel/IgniteTrimExchange.java      |   2 +-
 .../rel/logical/IgniteLogicalIndexScan.java        |   1 -
 .../query/calcite/rule/SortConverterRule.java      |  52 +++-
 .../processors/query/calcite/trait/TraitUtils.java |   2 +-
 .../query/calcite/util/IgniteResource.java         |   5 +
 .../processors/query/calcite/LimitOffsetTest.java  | 322 +++++++++++++++++++++
 .../processors/query/calcite/PlannerTest.java      | 117 +++++++-
 .../ignite/testsuites/IgniteCalciteTestSuite.java  |   2 +
 25 files changed, 943 insertions(+), 44 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 3a94f0b..a4ff6cd 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -38,6 +38,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.AggregateNod
 import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.CorrelatedNestedLoopJoinNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.FilterNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.LimitNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.ModifyNode;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.NestedLoopJoinNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
@@ -54,6 +55,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedN
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
@@ -299,6 +301,20 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
     }
 
     /** {@inheritDoc} */
+    @Override public Node<Row> visit(IgniteLimit rel) {
+        Supplier<Integer> offset = (rel.offset() == null) ? null : 
expressionFactory.execute(rel.offset());
+        Supplier<Integer> fetch = (rel.fetch() == null) ? null : 
expressionFactory.execute(rel.fetch());
+
+        LimitNode<Row> node = new LimitNode<>(ctx, rel.getRowType(), offset, 
fetch);
+
+        Node<Row> input = visit(rel.getInput());
+
+        node.register(input);
+
+        return node;
+    }
+
+    /** {@inheritDoc} */
     @Override public Node<Row> visit(IgniteSort rel) {
         RelCollation collation = rel.getCollation();
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java
index e731601..2672e5b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java
@@ -81,6 +81,12 @@ public interface ExpressionFactory<Row> {
      */
     Supplier<Row> rowSource(List<RexNode> values);
 
+
+    /**
+     * Executes expression.
+     */
+    <T> Supplier<T> execute(RexNode node);
+
     /**
      * Creates {@link Scalar}, a code-generated expressions evaluator.
      *
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
index 27670d2..0c79666 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
@@ -157,6 +157,11 @@ public class ExpressionFactoryImpl<Row> implements 
ExpressionFactory<Row> {
     }
 
     /** {@inheritDoc} */
+    @Override public <T> Supplier<T> execute(RexNode node) {
+        return new ValueImpl<T>(scalar(node, null), 
ctx.rowHandler().factory(typeFactory.getJavaClass(node.getType())));
+    }
+
+    /** {@inheritDoc} */
     @Override public Iterable<Row> values(List<RexLiteral> values, RelDataType 
rowType) {
         RowHandler<Row> handler = ctx.rowHandler();
         RowFactory<Row> factory = handler.factory(typeFactory, rowType);
@@ -337,6 +342,29 @@ public class ExpressionFactoryImpl<Row> implements 
ExpressionFactory<Row> {
     }
 
     /** */
+    private class ValueImpl<T> implements Supplier<T> {
+        /** */
+        private final Scalar scalar;
+
+        /** */
+        private final RowFactory<Row> factory;
+
+        /** */
+        private ValueImpl(Scalar scalar, RowFactory<Row> factory) {
+            this.scalar = scalar;
+            this.factory = factory;
+        }
+
+        /** {@inheritDoc} */
+        @Override public T get() {
+            Row res = factory.create();
+            scalar.execute(ctx, null, res);
+
+            return (T)ctx.rowHandler().get(0, res);
+        }
+    }
+
+    /** */
     private class FieldGetter implements InputGetter {
         /** */
         private final Expression hnd_;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
index ab410a6..41d2181 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
@@ -51,7 +51,7 @@ public abstract class AbstractNode<Row> implements Node<Row> {
     /**
      * {@link Inbox} node may not have proper context at creation time in case 
it
      * creates on first message received from a remote source. This case the 
context
-     * sets in scope of {@link Inbox#init(ExecutionContext, Collection, 
Comparator)} method call.
+     * sets in scope of {@link Inbox#init(ExecutionContext, RelDataType, 
Collection, Comparator)} method call.
      */
     private ExecutionContext<Row> ctx;
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitNode.java
new file mode 100644
index 0000000..842a44b
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitNode.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.function.Supplier;
+import org.apache.calcite.rel.type.RelDataType;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+/** Offset, fetch|limit support node. */
+public class LimitNode<Row> extends AbstractNode<Row> implements 
SingleNode<Row>, Downstream<Row> {
+    /** Offset if its present, otherwise 0. */
+    private final int offset;
+
+    /** Fetch if its present, otherwise 0. */
+    private final int fetch;
+
+    /** Already processed (pushed to upstream) rows count. */
+    private int rowsProcessed;
+
+    /** Fetch can be unset, in this case we need all rows. */
+    private @Nullable Supplier<Integer> fetchNode;
+
+    /** Waiting results counter. */
+    private int waiting;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Execution context.
+     * @param rowType Row type.
+     */
+    public LimitNode(
+        ExecutionContext<Row> ctx,
+        RelDataType rowType,
+        Supplier<Integer> offsetNode,
+        Supplier<Integer> fetchNode
+    ) {
+        super(ctx, rowType);
+
+        offset = offsetNode == null ? 0 : offsetNode.get();
+        fetch = fetchNode == null ? 0 : fetchNode.get();
+        this.fetchNode = fetchNode;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void request(int rowsCnt) {
+        assert !F.isEmpty(sources()) && sources().size() == 1;
+        assert rowsCnt > 0;
+
+        if (fetchNone()) {
+            end();
+
+            return;
+        }
+
+        if (offset > 0 && rowsProcessed == 0)
+            rowsCnt = offset + rowsCnt;
+
+        try {
+            checkState();
+
+            source().request(waiting = rowsCnt);
+        }
+        catch (Exception e) {
+            onError(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void push(Row row) {
+        if (waiting == -1)
+            return;
+
+        ++rowsProcessed;
+
+        --waiting;
+
+        try {
+            checkState();
+        }
+        catch (Throwable e) {
+            onError(e);
+        }
+
+        if (rowsProcessed > offset) {
+            if (fetchNode == null || (fetchNode != null && rowsProcessed <= 
fetch + offset))
+                downstream().push(row);
+        }
+
+        if (fetch > 0 && rowsProcessed == fetch + offset && waiting > 0)
+            end();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void end() {
+        try {
+            if (waiting == -1)
+                return;
+
+            assert downstream() != null;
+
+            waiting = -1;
+
+            downstream().end();
+        }
+        catch (Exception e) {
+            onError(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void rewindInternal() {
+        rowsProcessed = 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Downstream<Row> requestDownstream(int idx) {
+        if (idx != 0)
+            throw new IndexOutOfBoundsException();
+
+        return this;
+    }
+
+    /** {@code True} if requested 0 results, or all already processed. */
+    private boolean fetchNone() {
+        return (fetchNode != null && fetch == 0) || (fetch > 0 && 
rowsProcessed == fetch + offset);
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
index 028baa6..b9833af 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
@@ -73,7 +73,6 @@ public interface Node<Row> extends AutoCloseable {
      */
     void request(int rowsCnt);
 
-
     /**
      * Rewinds upstream.
      */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
index 2006666..d83267b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
@@ -52,7 +52,7 @@ public class ScanNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>
 
     /** {@inheritDoc} */
     @Override public void request(int rowsCnt) {
-        assert rowsCnt > 0 && requested == 0;
+        assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ", 
requested=" + requested;
 
         try {
             checkState();
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
index 782b012..cb30293 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
@@ -155,8 +155,10 @@ public class SortNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>,
                 }
             }
 
-            if (requested >= 0) {
-                downstream().end();
+            if (rows.isEmpty()) {
+                if (requested > 0)
+                    downstream().end();
+
                 requested = 0;
             }
         }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
index 13c2b60..bbda712 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -24,6 +24,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedN
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
@@ -150,6 +151,11 @@ class Cloner implements IgniteRelVisitor<IgniteRel> {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteLimit rel) {
+        return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteReceiver rel) {
         return collect((IgniteReceiver)rel.clone(cluster, F.asList()));
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IdGenerator.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IdGenerator.java
index f9f5318..9808c66 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IdGenerator.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IdGenerator.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
 import java.util.concurrent.atomic.AtomicLong;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
index bf0709d..bd584b9 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
@@ -233,7 +233,7 @@ public class IgnitePlanner implements Planner, 
RelOptTable.ViewExpander {
         }
 
         CalciteCatalogReader catalogReader = 
this.catalogReader.withSchemaPath(schemaPath);
-        SqlValidator validator = new IgniteSqlValidator(operatorTbl, 
catalogReader, typeFactory, validatorCfg);
+        SqlValidator validator = new IgniteSqlValidator(operatorTbl, 
catalogReader, typeFactory, validatorCfg, ctx.parameters());
         SqlToRelConverter sqlToRelConverter = sqlToRelConverter(validator, 
catalogReader, sqlToRelConverterCfg);
         RelRoot root = sqlToRelConverter.convertQuery(sqlNode, true, false);
         root = root.withRel(sqlToRelConverter.decorrelate(sqlNode, root.rel));
@@ -289,7 +289,7 @@ public class IgnitePlanner implements Planner, 
RelOptTable.ViewExpander {
     /** */
     private SqlValidator validator() {
         if (validator == null)
-            validator = new IgniteSqlValidator(operatorTbl, catalogReader, 
typeFactory, validatorCfg);
+            validator = new IgniteSqlValidator(operatorTbl, catalogReader, 
typeFactory, validatorCfg, ctx.parameters());
 
         return validator;
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
index 3b3e873..2e5dfc7 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
@@ -23,6 +23,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedN
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
@@ -108,6 +109,11 @@ public class IgniteRelShuttle implements 
IgniteRelVisitor<IgniteRel> {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteLimit rel) {
+        return processNode(rel);
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteIndexScan rel) {
         return processNode(rel);
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java
index d34a74a..b593eb9 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
+import java.math.BigDecimal;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -29,9 +30,11 @@ import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.sql.SqlAggFunction;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlDelete;
+import org.apache.calcite.sql.SqlDynamicParam;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlInsert;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlOperatorTable;
@@ -50,11 +53,18 @@ import 
org.apache.ignite.internal.processors.query.QueryUtils;
 import 
org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
 import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.IgniteResource;
+import org.apache.ignite.internal.util.typedef.F;
 
 import static org.apache.calcite.util.Static.RESOURCE;
 
 /** Validator. */
 public class IgniteSqlValidator extends SqlValidatorImpl {
+    /** Decimal of Integer.MAX_VALUE for fetch/offset bounding. */
+    private static final BigDecimal DEC_INT_MAX = 
BigDecimal.valueOf(Integer.MAX_VALUE);
+
+    /** Dynamic parameters. */
+    Object[] parameters;
+
     /**
      * Creates a validator.
      *
@@ -62,10 +72,13 @@ public class IgniteSqlValidator extends SqlValidatorImpl {
      * @param catalogReader Catalog reader
      * @param typeFactory   Type factory
      * @param config        Config
+     * @param parameters    Dynamic parameters
      */
     public IgniteSqlValidator(SqlOperatorTable opTab, CalciteCatalogReader 
catalogReader,
-        IgniteTypeFactory typeFactory, SqlValidator.Config config) {
+        IgniteTypeFactory typeFactory, SqlValidator.Config config, Object[] 
parameters) {
         super(opTab, catalogReader, typeFactory, config);
+
+        this.parameters = parameters;
     }
 
     /** {@inheritDoc} */
@@ -126,6 +139,43 @@ public class IgniteSqlValidator extends SqlValidatorImpl {
     }
 
     /** {@inheritDoc} */
+    @Override protected void validateSelect(SqlSelect select, RelDataType 
targetRowType) {
+        checkIntegerLimit(select.getFetch(), "fetch / limit");
+        checkIntegerLimit(select.getOffset(), "offset");
+
+        super.validateSelect(select, targetRowType);
+    }
+
+
+    /**
+     * @param n Node to check limit.
+     * @param nodeName Node name.
+     */
+    private void checkIntegerLimit(SqlNode n, String nodeName) {
+        if (n instanceof SqlLiteral) {
+            BigDecimal offFetchLimit = ((SqlLiteral)n).bigDecimalValue();
+
+            if (offFetchLimit.compareTo(DEC_INT_MAX) > 0 || 
offFetchLimit.compareTo(BigDecimal.ZERO) < 0)
+                throw newValidationError(n, 
IgniteResource.INSTANCE.correctIntegerLimit(nodeName));
+        }
+        else if (n instanceof SqlDynamicParam) {
+            // will fail in params check.
+            if (F.isEmpty(parameters))
+                return;
+
+            int idx = ((SqlDynamicParam) n).getIndex();
+
+            if (idx < parameters.length) {
+                Object param = parameters[idx];
+                if (parameters[idx] instanceof Integer) {
+                    if ((Integer)param < 0)
+                        throw newValidationError(n, 
IgniteResource.INSTANCE.correctIntegerLimit(nodeName));
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void validateCall(SqlCall call, SqlValidatorScope scope) {
         if (call.getKind() == SqlKind.AS) {
             final String alias = deriveAlias(call, 0);
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index fde1c7c..6690bbd 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rel.rules.JoinPushThroughJoinRule;
 import org.apache.calcite.rel.rules.ProjectFilterTransposeRule;
 import org.apache.calcite.rel.rules.ProjectMergeRule;
 import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rel.rules.PruneEmptyRules;
 import org.apache.calcite.rel.rules.SortRemoveRule;
 import org.apache.calcite.tools.Program;
 import org.apache.calcite.tools.RuleSet;
@@ -137,8 +138,14 @@ public enum PlannerPhase {
                     CoreRules.AGGREGATE_REMOVE,
                     CoreRules.AGGREGATE_REDUCE_FUNCTIONS,
 
-                    ExposeIndexRule.INSTANCE,
+                    PruneEmptyRules.SortFetchZeroRuleConfig.EMPTY
+                        .withOperandSupplier(b ->
+                            b.operand(LogicalSort.class).anyInputs())
+                        .withDescription("PruneSortLimit0")
+                        .as(PruneEmptyRules.SortFetchZeroRuleConfig.class)
+                        .toRule(),
 
+                    ExposeIndexRule.INSTANCE,
                     ProjectScanMergeRule.TABLE_SCAN,
                     ProjectScanMergeRule.INDEX_SCAN,
                     FilterScanMergeRule.TABLE_SCAN,
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java
new file mode 100644
index 0000000..2e25e23
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.Pair;
+
+/** */
+public class IgniteLimit extends SingleRel implements IgniteRel {
+    /** Offset. */
+    private final RexNode offset;
+
+    /** Fetches rows expression (limit) */
+    private final RexNode fetch;
+
+    /**
+     * Constructor.
+     *
+     * @param cluster Cluster.
+     * @param traits Trait set.
+     * @param child Input relational expression.
+     * @param offset Offset.
+     * @param fetch Limit.
+     */
+    public IgniteLimit(
+        RelOptCluster cluster,
+        RelTraitSet traits,
+        RelNode child,
+        RexNode offset,
+        RexNode fetch
+    ) {
+        super(cluster, traits, child);
+        this.offset = offset;
+        this.fetch = fetch;
+    }
+
+    /** {@inheritDoc} */
+    @Override public final IgniteLimit copy(RelTraitSet traitSet, 
List<RelNode> inputs) {
+        return new IgniteLimit(getCluster(), traitSet, sole(inputs), offset, 
fetch);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelWriter explainTerms(RelWriter pw) {
+        super.explainTerms(pw);
+        pw.itemIf("offset", offset, offset != null);
+        pw.itemIf("fetch", fetch, fetch != null);
+        return pw;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughTraits(RelTraitSet required) {
+        if (required.getConvention() != IgniteConvention.INSTANCE)
+            return null;
+
+        return Pair.of(required, ImmutableList.of(required));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
deriveTraits(RelTraitSet childTraits, int childId) {
+        assert childId == 0;
+
+        if (childTraits.getConvention() != IgniteConvention.INSTANCE)
+            return null;
+
+        return Pair.of(childTraits, ImmutableList.of(childTraits));
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, 
RelMetadataQuery mq) {
+        double rows = estimateRowCount(mq);
+
+        return planner.getCostFactory().makeCost(rows, 0, 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public double estimateRowCount(RelMetadataQuery mq) {
+        Integer lim = intFromRex(fetch);
+        Integer off = intFromRex(offset);
+
+        if (lim == null) {
+            // If estimated rowcount is less than offset return 0
+            if (off == null)
+                return getInput().estimateRowCount(mq) * 0.01;
+
+            return Math.max(0, getInput().estimateRowCount(mq) - off);
+        }
+        else {
+            // probably we can process DYNAMIC_PARAM here too.
+            if (off != null)
+                return lim + off;
+            else
+                return lim / 2.;
+        }
+    }
+
+    /**
+     * @return Integer value of the literal expression.
+     */
+    private Integer intFromRex(RexNode n) {
+        try {
+            if (n.isA(SqlKind.LITERAL))
+                return ((RexLiteral)n).getValueAs(Integer.class);
+            else
+                return null;
+        }
+        catch (Exception e) {
+            return null;
+        }
+    }
+
+    /**
+     * @return Offset.
+     */
+    public RexNode offset() {
+        return offset;
+    }
+
+    /**
+     * @return Fetches rows expression (limit)
+     */
+    public RexNode fetch() {
+        return fetch;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> 
inputs) {
+        return new IgniteLimit(cluster, getTraitSet(), sole(inputs), offset, 
fetch);
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
index e43b43c..223d3cf 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
@@ -112,6 +112,11 @@ public interface IgniteRelVisitor<T> {
     T visit(IgniteTableSpool rel);
 
     /**
+     * See {@link IgniteRelVisitor#visit(IgniteRel)}
+     */
+    T visit(IgniteLimit rel);
+
+    /**
      * Visits a relational node and calculates a result on the basis of node 
meta information.
      * @param rel Relational node.
      * @return Visit result.
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java
index 29d6a2d..838574d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java
@@ -42,19 +42,16 @@ public class IgniteSort extends Sort implements IgniteRel {
      * @param traits Trait set.
      * @param child Input node.
      * @param collation Collation.
-     * @param offset Offset.
-     * @param fetch Limit.
      */
     public IgniteSort(
         RelOptCluster cluster,
         RelTraitSet traits,
         RelNode child,
-        RelCollation collation,
-        RexNode offset,
-        RexNode fetch) {
-        super(cluster, traits, child, collation, offset, fetch);
+        RelCollation collation) {
+        super(cluster, traits, child, collation);
     }
 
+    /** */
     public IgniteSort(RelInput input) {
         super(changeTraits(input, IgniteConvention.INSTANCE));
     }
@@ -65,8 +62,11 @@ public class IgniteSort extends Sort implements IgniteRel {
         RelNode newInput,
         RelCollation newCollation,
         RexNode offset,
-        RexNode fetch) {
-        return new IgniteSort(getCluster(), traitSet, newInput,newCollation, 
offset, fetch);
+        RexNode fetch
+    ) {
+        assert offset == null && fetch == null;
+
+        return new IgniteSort(getCluster(), traitSet, newInput, newCollation);
     }
 
     /** {@inheritDoc} */
@@ -101,6 +101,6 @@ public class IgniteSort extends Sort implements IgniteRel {
 
     /** {@inheritDoc} */
     @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> 
inputs) {
-        return new IgniteSort(cluster, getTraitSet(), sole(inputs), collation, 
offset, fetch);
+        return new IgniteSort(cluster, getTraitSet(), sole(inputs), collation);
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
index cc159b6..4bd3a17 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
@@ -58,7 +58,7 @@ public class IgniteTrimExchange extends Exchange implements 
SourceAwareIgniteRel
         this.sourceId = sourceId;
     }
 
-
+    /** */
     public IgniteTrimExchange(RelInput input) {
         super(changeTraits(input, IgniteConvention.INSTANCE));
         sourceId = ((Number)input.get("sourceId")).longValue();
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalIndexScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalIndexScan.java
index d510188..08c1609 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalIndexScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalIndexScan.java
@@ -39,7 +39,6 @@ import org.jetbrains.annotations.Nullable;
 
 /** */
 public class IgniteLogicalIndexScan extends AbstractIndexScan {
-
     /** Creates a IgniteLogicalIndexScan. */
     public static IgniteLogicalIndexScan create(
         RelOptCluster cluster,
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortConverterRule.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortConverterRule.java
index 822ea54..242d516 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortConverterRule.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortConverterRule.java
@@ -17,35 +17,61 @@
 package org.apache.ignite.internal.processors.query.calcite.rule;
 
 import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.PhysicalNode;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.logical.LogicalSort;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
 
 /**
  * Converter rule for sort operator.
  */
-public class SortConverterRule extends 
AbstractIgniteConverterRule<LogicalSort> {
+public class SortConverterRule extends RelRule<SortConverterRule.Config> {
     /** */
-    public static final RelOptRule INSTANCE = new SortConverterRule();
+    public static final RelOptRule INSTANCE =
+        SortConverterRule.Config.DEFAULT
+            .as(SortConverterRule.Config.class).toRule();
 
-    /** */
-    public SortConverterRule() {
-        super(LogicalSort.class, "SortConverterRule");
+    /** Creates a LimitConverterRule. */
+    protected SortConverterRule(SortConverterRule.Config config) {
+        super(config);
+    }
+
+    /** Rule configuration. */
+    public interface Config extends RelRule.Config {
+        SortConverterRule.Config DEFAULT = EMPTY
+            .withOperandSupplier(b ->
+                b.operand(LogicalSort.class).anyInputs())
+            .as(SortConverterRule.Config.class);
+
+        /** {@inheritDoc} */
+        @Override default SortConverterRule toRule() {
+            return new SortConverterRule(this);
+        }
     }
 
     /** {@inheritDoc} */
-    @Override protected PhysicalNode convert(RelOptPlanner planner, 
RelMetadataQuery mq, LogicalSort rel) {
-        RelOptCluster cluster = rel.getCluster();
-        RelTraitSet outTraits = 
cluster.traitSetOf(IgniteConvention.INSTANCE).replace(rel.getCollation());
+    @Override public void onMatch(RelOptRuleCall call) {
+        final Sort sort = call.rel(0);
+        RelOptCluster cluster = sort.getCluster();
+        RelTraitSet outTraits = 
cluster.traitSetOf(IgniteConvention.INSTANCE).replace(sort.getCollation());
         RelTraitSet inTraits = cluster.traitSetOf(IgniteConvention.INSTANCE);
-        RelNode input = convert(rel.getInput(), inTraits);
+        RelNode input = convert(sort.getInput(), inTraits);
+
+        if (sort.fetch != null || sort.offset != null) {
+            RelTraitSet traits = 
cluster.traitSetOf(IgniteConvention.INSTANCE).replace(sort.getCollation());
+
+            call.transformTo(new IgniteLimit(cluster, traits, 
convert(sort.getInput(), traits), sort.offset,
+                sort.fetch));
+
+            return;
+        }
 
-        return new IgniteSort(cluster, outTraits, input, rel.getCollation(), 
rel.offset, rel.fetch);
+        call.transformTo(new IgniteSort(cluster, outTraits, input, 
sort.getCollation()));
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
index 0589516..48601fd 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
@@ -123,7 +123,7 @@ public class TraitUtils {
 
         RelTraitSet traits = rel.getTraitSet().replace(toTrait);
 
-        return new IgniteSort(rel.getCluster(), traits, rel, toTrait, null, 
null);
+        return new IgniteSort(rel.getCluster(), traits, rel, toTrait);
     }
 
     /** */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteResource.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteResource.java
index 8a5f02b..a0dfd8e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteResource.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteResource.java
@@ -38,4 +38,9 @@ public interface IgniteResource {
     /** */
     @Resources.BaseMessage("Illegal aggregate function. {0} is unsupported at 
the moment.")
     Resources.ExInst<SqlValidatorException> 
unsupportedAggregationFunction(String a0);
+
+    /** */
+    @Resources.BaseMessage("Illegal value of {0}. The value must be positive 
and less than Integer.MAX_VALUE " +
+        "(" + Integer.MAX_VALUE + ")." )
+    Resources.ExInst<SqlValidatorException> correctIntegerLimit(String a0);
 }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/LimitOffsetTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/LimitOffsetTest.java
new file mode 100644
index 0000000..e722f31
--- /dev/null
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/LimitOffsetTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryEngine;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.AbstractNode;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * Limit / offset tests.
+ */
+public class LimitOffsetTest extends GridCommonAbstractTest {
+    /** */
+    private static IgniteCache<Integer, String> cacheRepl;
+
+    /** */
+    private static IgniteCache<Integer, String> cachePart;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrids(2);
+
+        cacheRepl = grid(0).cache("TEST_REPL");
+        cachePart = grid(0).cache("TEST_PART");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        QueryEntity eRepl = new QueryEntity()
+            .setTableName("TEST_REPL")
+            .setKeyType(Integer.class.getName())
+            .setValueType(String.class.getName())
+            .setKeyFieldName("id")
+            .setValueFieldName("val")
+            .addQueryField("id", Integer.class.getName(), null)
+            .addQueryField("val", String.class.getName(), null);
+
+        QueryEntity ePart = new QueryEntity()
+            .setTableName("TEST_PART")
+            .setKeyType(Integer.class.getName())
+            .setValueType(String.class.getName())
+            .setKeyFieldName("id")
+            .setValueFieldName("val")
+            .addQueryField("id", Integer.class.getName(), null)
+            .addQueryField("val", String.class.getName(), null);;
+
+        return super.getConfiguration(igniteInstanceName)
+            .setCacheConfiguration(
+                new CacheConfiguration<>(eRepl.getTableName())
+                    .setCacheMode(CacheMode.REPLICATED)
+                    .setQueryEntities(singletonList(eRepl))
+                    .setSqlSchema("PUBLIC"),
+                new CacheConfiguration<>(ePart.getTableName())
+                    .setCacheMode(CacheMode.PARTITIONED)
+                    .setQueryEntities(singletonList(ePart))
+                    .setSqlSchema("PUBLIC"));
+    }
+
+    /** Tests correctness of fetch / offset params. */
+    @Test
+    public void testInvalidLimitOffset() {
+        QueryEngine engine = Commons.lookupComponent(grid(0).context(), 
QueryEngine.class);
+
+        String bigInt = BigDecimal.valueOf(10000000000L).toString();
+
+        try {
+            List<FieldsQueryCursor<List<?>>> cursors =
+                engine.query(null, "PUBLIC",
+                    "SELECT * FROM TEST_REPL OFFSET " + bigInt + " ROWS");
+            cursors.get(0).getAll();
+
+            fail();
+        }
+        catch (Throwable e) {
+            assertTrue(e.toString(), X.hasCause(e, "Illegal value of offset", 
SqlValidatorException.class));
+        }
+
+        try {
+            List<FieldsQueryCursor<List<?>>> cursors =
+                engine.query(null, "PUBLIC",
+                    "SELECT * FROM TEST_REPL FETCH FIRST " + bigInt + " ROWS 
ONLY");
+            cursors.get(0).getAll();
+
+            fail();
+        }
+        catch (Throwable e) {
+            assertTrue(e.toString(), X.hasCause(e, "Illegal value of fetch / 
limit", SqlValidatorException.class));
+        }
+
+        try {
+            List<FieldsQueryCursor<List<?>>> cursors =
+                engine.query(null, "PUBLIC", "SELECT * FROM TEST_REPL LIMIT " 
+ bigInt);
+            cursors.get(0).getAll();
+
+            fail();
+        }
+        catch (Throwable e) {
+            assertTrue(e.toString(), X.hasCause(e, "Illegal value of fetch / 
limit", SqlValidatorException.class));
+        }
+
+        try {
+            List<FieldsQueryCursor<List<?>>> cursors =
+                engine.query(null, "PUBLIC",
+                    "SELECT * FROM TEST_REPL OFFSET -1 ROWS FETCH FIRST -1 
ROWS ONLY");
+            cursors.get(0).getAll();
+
+            fail();
+        }
+        catch (Throwable e) {
+            assertTrue(e.toString(), X.hasCause(e, IgniteSQLException.class));
+        }
+
+        try {
+            List<FieldsQueryCursor<List<?>>> cursors =
+                engine.query(null, "PUBLIC",
+                    "SELECT * FROM TEST_REPL OFFSET -1 ROWS");
+            cursors.get(0).getAll();
+
+            fail();
+        }
+        catch (Throwable e) {
+            assertTrue(e.toString(), X.hasCause(e, IgniteSQLException.class));
+        }
+
+        try {
+            List<FieldsQueryCursor<List<?>>> cursors =
+                engine.query(null, "PUBLIC",
+                    "SELECT * FROM TEST_REPL OFFSET 2+1 ROWS");
+            cursors.get(0).getAll();
+
+            fail();
+        }
+        catch (Throwable e) {
+            assertTrue(e.toString(), X.hasCause(e, IgniteSQLException.class));
+        }
+
+        // Check with parameters
+        try {
+            List<FieldsQueryCursor<List<?>>> cursors =
+                engine.query(null, "PUBLIC",
+                    "SELECT * FROM TEST_REPL OFFSET ? ROWS FETCH FIRST ? ROWS 
ONLY", -1, -1);
+            cursors.get(0).getAll();
+
+            fail();
+        }
+        catch (Throwable e) {
+            assertTrue(e.toString(), X.hasCause(e, "Illegal value of fetch / 
limit", SqlValidatorException.class));
+        }
+
+        try {
+            List<FieldsQueryCursor<List<?>>> cursors =
+                engine.query(null, "PUBLIC",
+                    "SELECT * FROM TEST_REPL OFFSET ? ROWS", -1);
+            cursors.get(0).getAll();
+
+            fail();
+        }
+        catch (Throwable e) {
+            assertTrue(e.toString(), X.hasCause(e, "Illegal value of offset", 
SqlValidatorException.class));
+        }
+
+        try {
+            List<FieldsQueryCursor<List<?>>> cursors =
+                engine.query(null, "PUBLIC",
+                    "SELECT * FROM TEST_REPL FETCH FIRST ? ROWS ONLY", -1);
+            cursors.get(0).getAll();
+
+            fail();
+        }
+        catch (Throwable e) {
+            assertTrue(e.toString(), X.hasCause(e, "Illegal value of fetch / 
limit", SqlValidatorException.class));
+        }
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testLimitOffset() throws Exception {
+        int inBufSize = U.field(AbstractNode.class, "IN_BUFFER_SIZE");
+
+        int[] rowsArr = {10, inBufSize, (2 * inBufSize) - 1};
+
+        for (int rows : rowsArr) {
+            fillCache(cacheRepl, rows);
+
+            int[] limits = {-1, 0, 10, rows / 2 - 1, rows / 2, rows / 2 + 1, 
rows - 1, rows};
+            int[] offsets = {-1, 0, 10, rows / 2 - 1, rows / 2, rows / 2 + 1, 
rows - 1, rows};
+
+            for (int lim : limits) {
+                for (int off : offsets) {
+                    log.info("+++ Check [rows=" + rows + ", limit=" + lim + ", 
off=" + off + ']');
+
+                    checkQuery(rows, lim, off, false, false);
+                    checkQuery(rows, lim, off, true, false);
+                    checkQuery(rows, lim, off, false, true);
+                    checkQuery(rows, lim, off, true, true);
+                }
+            }
+        }
+    }
+
+    /**
+     * @param c Cache.
+     * @param rows Rows count.
+     */
+    private void fillCache(IgniteCache c, int rows) throws 
InterruptedException {
+        c.clear();
+
+        for (int i = 0; i < rows; ++i)
+            c.put(i, "val_" + i);
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
+     * Check query with specified limit and offset (or without its when the 
arguments are negative),
+     *
+     * @param rows Rows count.
+     * @param lim Limit.
+     * @param off Offset.
+     * @param param If {@code false} place limit/offset as literals. Otherwise 
they are plase as parameters.
+     * @param sorted Use sorted query (adds ORDER BY).
+     */
+    void checkQuery(int rows, int lim, int off, boolean param, boolean sorted) 
{
+        QueryEngine engine = Commons.lookupComponent(grid(0).context(), 
QueryEngine.class);
+
+        String sql = createSql(lim, off, param, sorted);
+
+        Object[] params;
+        if (lim >= 0 && off >= 0)
+            params = new Object[]{off, lim};
+        else if (lim >= 0)
+            params = new Object[]{lim};
+        else if (off >= 0)
+            params = new Object[]{off};
+        else
+            params = X.EMPTY_OBJECT_ARRAY;
+
+        log.info("SQL: " + sql + (param ? "params=" + Arrays.toString(params) 
: ""));
+
+        List<FieldsQueryCursor<List<?>>> cursors =
+            engine.query(null, "PUBLIC", sql, param ? params : 
X.EMPTY_OBJECT_ARRAY);
+
+        List<List<?>> res = cursors.get(0).getAll();
+
+        assertEquals("Invalid results size. [rows=" + rows + ", limit=" + lim 
+ ", off=" + off
+            + ", res=" + res.size() + ']', expectedSize(rows, lim, off), 
res.size());
+    }
+
+    /**
+     * Calculates expected result set size by limit and offset.
+     */
+    private int expectedSize(int rows, int lim, int off) {
+        if (off < 0)
+            off = 0;
+
+        if (lim == 0)
+            return 0;
+        else if (lim < 0)
+            return rows - off;
+        else if (lim + off < rows)
+            return lim;
+        else if (off > rows)
+            return 0;
+        else
+            return rows - off;
+    }
+
+    /**
+     * @param lim Limit.
+     * @param off Offset.
+     * @param param Flag to place limit/offset  by parameter or literal.
+     * @return SQL query string.
+     */
+    private String createSql(int lim, int off, boolean param, boolean sorted) {
+        StringBuilder sb = new StringBuilder("SELECT * FROM TEST_REPL ");
+
+        if (sorted)
+            sb.append("ORDER BY ID ");
+
+        if (off >= 0)
+            sb.append("OFFSET ").append(param ? "?" : 
Integer.toString(off)).append(" ROWS ");
+
+        if (lim >= 0)
+            sb.append("FETCH FIRST ").append(param ? "?" : 
Integer.toString(lim)).append(" ROWS ONLY ");
+
+        return sb.toString();
+    }
+}
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
index d36600f..abfc643 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -87,7 +88,9 @@ import 
org.apache.ignite.internal.processors.query.calcite.prepare.QueryTemplate
 import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
@@ -115,7 +118,6 @@ import org.jetbrains.annotations.Nullable;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.apache.calcite.tools.Frameworks.createRootSchema;
@@ -2690,9 +2692,6 @@ public class PlannerTest extends GridCommonAbstractTest {
         publicSchema.addTable("T0", t0);
         publicSchema.addTable("T1", t1);
 
-        SchemaPlus schema = createRootSchema(false)
-            .add("PUBLIC", publicSchema);
-
         String sql = "select * " +
             "from t0 " +
             "join t1 on t0.jid = t1.jid";
@@ -2755,9 +2754,6 @@ public class PlannerTest extends GridCommonAbstractTest {
         publicSchema.addTable("T0", t0);
         publicSchema.addTable("T1", t1);
 
-        SchemaPlus schema = createRootSchema(false)
-            .add("PUBLIC", publicSchema);
-
         String sql = "select * " +
             "from t0 " +
             "join t1 on t0.jid = t1.jid";
@@ -2782,6 +2778,104 @@ public class PlannerTest extends GridCommonAbstractTest 
{
         assertEquals(1, spoolCnt.get());
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testLimit() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable testTbl = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("VAL", f.createJavaType(String.class))
+                .build()) {
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.broadcast();
+            }
+        };
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("TEST", testTbl);
+
+        String sql = "SELECT * FROM TEST OFFSET 10 ROWS FETCH FIRST 10 ROWS 
ONLY";
+
+        {
+            RelNode phys = physicalPlan(sql, publicSchema);
+
+            assertNotNull(phys);
+
+            AtomicInteger limit = new AtomicInteger();
+            AtomicBoolean sort = new AtomicBoolean();
+
+            relTreeVisit(phys, (node, ordinal, parent) -> {
+                    if (node instanceof IgniteLimit)
+                        limit.incrementAndGet();
+
+                    if (node instanceof IgniteSort)
+                        sort.set(true);
+                }
+            );
+
+            assertEquals("Invalid plan: \n" + RelOptUtil.toString(phys), 1, 
limit.get());
+            assertFalse("Invalid plan: \n" + RelOptUtil.toString(phys), 
sort.get());
+        }
+
+        sql = "SELECT * FROM TEST ORDER BY ID OFFSET 10 ROWS FETCH FIRST 10 
ROWS ONLY";
+
+        {
+            RelNode phys = physicalPlan(sql, publicSchema);
+
+            assertNotNull(phys);
+
+            AtomicInteger limit = new AtomicInteger();
+            AtomicBoolean sort = new AtomicBoolean();
+
+            relTreeVisit(phys, (node, ordinal, parent) -> {
+                    if (node instanceof IgniteLimit)
+                        limit.incrementAndGet();
+
+                    if (node instanceof IgniteSort)
+                        sort.set(true);
+                }
+            );
+
+            assertEquals("Invalid plan: \n" + RelOptUtil.toString(phys), 1, 
limit.get());
+            assertFalse("Invalid plan: \n" + RelOptUtil.toString(phys), 
sort.get());
+        }
+    }
+
+    /** */
+    interface TestVisitor {
+        public void visit(RelNode node, int ordinal, RelNode parent);
+    }
+
+    /** */
+    private static class TestRelVisitor extends RelVisitor {
+        /** */
+        final TestVisitor v;
+
+        /** */
+        TestRelVisitor(TestVisitor v) {
+            this.v = v;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void visit(RelNode node, int ordinal, RelNode parent) 
{
+            v.visit(node, ordinal, parent);
+
+            super.visit(node, ordinal, parent);
+        }
+    }
+
+    /** */
+    protected static void relTreeVisit(RelNode n, TestVisitor v) {
+        v.visit(n, -1, null);
+
+        n.childrenAccept(new TestRelVisitor(v));
+    }
+
     /** */
     private IgniteRel physicalPlan(String sql, IgniteSchema publicSchema, 
String... disabledRules) throws Exception {
         SchemaPlus schema = createRootSchema(false)
@@ -2894,7 +2988,6 @@ public class PlannerTest extends GridCommonAbstractTest {
             this(type, RewindabilityTrait.REWINDABLE);
         }
 
-
         /** */
         private TestTable(RelDataType type, RewindabilityTrait rewindable) {
             this(type, rewindable, 100.0);
@@ -2996,8 +3089,12 @@ public class PlannerTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public boolean rolledUpColumnValidInsideAgg(String column, 
SqlCall call, SqlNode parent,
-            CalciteConnectionConfig config) {
+        @Override public boolean rolledUpColumnValidInsideAgg(
+            String column,
+            SqlCall call,
+            SqlNode parent,
+            CalciteConnectionConfig config
+        ) {
             throw new AssertionError();
         }
 
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
index d5da323..8e2fb07 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
@@ -21,6 +21,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.CalciteBasicSecondary
 import 
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest;
 import org.apache.ignite.internal.processors.query.calcite.CancelTest;
 import org.apache.ignite.internal.processors.query.calcite.DateTimeTest;
+import org.apache.ignite.internal.processors.query.calcite.LimitOffsetTest;
 import org.apache.ignite.internal.processors.query.calcite.PlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.QueryCheckerTest;
 import org.apache.ignite.internal.processors.query.calcite.TableSpoolTest;
@@ -51,6 +52,7 @@ import org.junit.runners.Suite;
     QueryCheckerTest.class,
     DateTimeTest.class,
     TableSpoolTest.class,
+    LimitOffsetTest.class
 })
 public class IgniteCalciteTestSuite {
 }

Reply via email to