This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3cb6be93ca IGNITE-20294 Sql. Using UDF as a place for system_range
function (#3729)
3cb6be93ca is described below
commit 3cb6be93ca905aac0028be5e83d2b58db74179cc
Author: Max Zhuravkov <[email protected]>
AuthorDate: Wed May 15 11:59:35 2024 +0300
IGNITE-20294 Sql. Using UDF as a place for system_range function (#3729)
---
.../internal/sql/engine/ItFunctionsTest.java | 15 +++
.../internal/sql/engine/SqlQueryProcessor.java | 4 +
.../sql/engine/exec/ExecutionServiceImpl.java | 12 +-
.../sql/engine/exec/LogicalRelImplementor.java | 20 +--
.../sql/engine/exec/TableFunctionScan.java | 51 --------
.../sql/engine/exec/exp/IgniteSqlFunctions.java | 136 ---------------------
.../internal/sql/engine/exec/exp/RexImpTable.java | 14 +--
.../exec/exp/func/IterableTableFunction.java | 60 +++++++++
.../exec/exp/func/SystemRangeTableFunction.java | 114 +++++++++++++++++
.../sql/engine/exec/exp/func/TableFunction.java | 31 +++++
.../exec/exp/func/TableFunctionInstance.java | 49 ++++++++
.../exec/exp/func/TableFunctionRegistry.java | 35 ++++++
.../exec/exp/func/TableFunctionRegistryImpl.java | 65 ++++++++++
.../internal/sql/engine/exec/rel/ScanNode.java | 53 +++++---
.../internal/sql/engine/util/IgniteMethod.java | 4 -
.../sql/engine/exec/ExecutionServiceImplTest.java | 11 +-
.../sql/engine/exec/rel/ScanNodeExecutionTest.java | 135 ++++++++++++++++++++
.../exec/rel/SystemRangeTableFunctionTest.java | 114 +++++++++++++++++
.../internal/sql/engine/framework/TestNode.java | 4 +
19 files changed, 691 insertions(+), 236 deletions(-)
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java
index 868253f5a3..2e3356084e 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java
@@ -169,10 +169,25 @@ public class ItFunctionsTest extends
BaseSqlIntegrationTest {
assertEquals(0, sql("SELECT * FROM table(system_range(null,
1))").size());
+ assertEquals(0, sql("SELECT * FROM table(system_range(1,
null))").size());
+
assertThrowsSqlException(
Sql.RUNTIME_ERR,
"Increment can't be 0",
() -> sql("SELECT * FROM table(system_range(1, 1, 0))"));
+
+ assertQuery("SELECT (SELECT * FROM table(system_range(4, 1)))")
+ .returns(null)
+ .check();
+
+ assertQuery("SELECT (SELECT * FROM table(system_range(1, 1)))")
+ .returns(1L)
+ .check();
+
+ assertThrowsSqlException(
+ Sql.RUNTIME_ERR,
+ "Subquery returned more than 1 value",
+ () -> sql("SELECT (SELECT * FROM table(system_range(1,
10)))"));
}
@Test
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 1f5b373c73..f19fbe2609 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -90,6 +90,7 @@ import
org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
+import
org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistryImpl;
import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider;
@@ -323,6 +324,8 @@ public class SqlQueryProcessor implements QueryProcessor {
tableManager, schemaManager, sqlSchemaManager, replicaService,
clockService, TABLE_CACHE_SIZE
);
+ var tableFunctionRegistry = new TableFunctionRegistryImpl();
+
var dependencyResolver = new ExecutionDependencyResolverImpl(
executableTableRegistry,
view -> () -> systemViewManager.scanView(view.name())
@@ -380,6 +383,7 @@ public class SqlQueryProcessor implements QueryProcessor {
mappingService,
executableTableRegistry,
dependencyResolver,
+ tableFunctionRegistry,
clockService,
EXECUTION_SERVICE_SHUTDOWN_TIMEOUT
));
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 21e109ec52..8936c00b1c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -74,6 +74,7 @@ import
org.apache.ignite.internal.sql.engine.QueryCancelledException;
import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
+import
org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistry;
import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappedFragment;
@@ -210,6 +211,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
/**
* Creates the execution services.
*
+ * @param <RowT> Type of the sql row.
* @param topSrvc Topology service.
* @param msgSrvc Message service.
* @param sqlSchemaManager Schema manager.
@@ -218,7 +220,10 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
* @param handler Row handler.
* @param mailboxRegistry Mailbox registry.
* @param exchangeSrvc Exchange service.
- * @param <RowT> Type of the sql row.
+ * @param mappingService Nodes mapping calculation service.
+ * @param tableRegistry Table registry.
+ * @param dependencyResolver Dependency resolver.
+ * @param tableFunctionRegistry Table function registry.
* @return An execution service.
*/
public static <RowT> ExecutionServiceImpl<RowT> create(
@@ -233,6 +238,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
MappingService mappingService,
ExecutableTableRegistry tableRegistry,
ExecutionDependencyResolver dependencyResolver,
+ TableFunctionRegistry tableFunctionRegistry,
ClockService clockService,
long shutdownTimeout
) {
@@ -250,7 +256,9 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
ctx,
mailboxRegistry,
exchangeSrvc,
- deps),
+ deps,
+ tableFunctionRegistry
+ ),
clockService,
shutdownTimeout
);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index 4a09b90dc6..96893bae04 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -38,6 +38,7 @@ import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.Minus;
import org.apache.calcite.rel.core.Spool;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
@@ -51,6 +52,8 @@ import
org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactory;
import org.apache.ignite.internal.sql.engine.exec.exp.RangeIterable;
import org.apache.ignite.internal.sql.engine.exec.exp.agg.AccumulatorWrapper;
import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunction;
+import
org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistry;
import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
import org.apache.ignite.internal.sql.engine.exec.rel.AbstractSetOpNode;
import
org.apache.ignite.internal.sql.engine.exec.rel.CorrelatedNestedLoopJoinNode;
@@ -147,6 +150,8 @@ public class LogicalRelImplementor<RowT> implements
IgniteRelVisitor<Node<RowT>>
private final ResolvedDependencies resolvedDependencies;
+ private final TableFunctionRegistry tableFunctionRegistry;
+
/**
* Constructor.
*
@@ -154,16 +159,20 @@ public class LogicalRelImplementor<RowT> implements
IgniteRelVisitor<Node<RowT>>
* @param mailboxRegistry Mailbox registry.
* @param exchangeSvc Exchange service.
* @param resolvedDependencies Dependencies required to execute this query.
+ * @param tableFunctionRegistry Table function registry.
*/
public LogicalRelImplementor(
ExecutionContext<RowT> ctx,
MailboxRegistry mailboxRegistry,
ExchangeService exchangeSvc,
- ResolvedDependencies resolvedDependencies) {
+ ResolvedDependencies resolvedDependencies,
+ TableFunctionRegistry tableFunctionRegistry
+ ) {
this.mailboxRegistry = mailboxRegistry;
this.exchangeSvc = exchangeSvc;
this.ctx = ctx;
this.resolvedDependencies = resolvedDependencies;
+ this.tableFunctionRegistry = tableFunctionRegistry;
expressionFactory = ctx.expressionFactory();
destinationFactory = new DestinationFactory<>(ctx.rowHandler(),
resolvedDependencies);
@@ -667,14 +676,9 @@ public class LogicalRelImplementor<RowT> implements
IgniteRelVisitor<Node<RowT>>
/** {@inheritDoc} */
@Override
public Node<RowT> visit(IgniteTableFunctionScan rel) {
- Supplier<Iterable<Object[]>> dataSupplier =
expressionFactory.execute(rel.getCall());
-
- RelDataType rowType = rel.getRowType();
-
- RowSchema rowSchema =
rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType));
- RowFactory<RowT> rowFactory = ctx.rowHandler().factory(rowSchema);
+ TableFunction<RowT> tableFunction =
tableFunctionRegistry.getTableFunction(ctx, (RexCall) rel.getCall());
- return new ScanNode<>(ctx, new TableFunctionScan<>(dataSupplier,
rowFactory));
+ return new ScanNode<>(ctx, tableFunction);
}
/** {@inheritDoc} */
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableFunctionScan.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableFunctionScan.java
deleted file mode 100644
index 45c1da50c8..0000000000
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableFunctionScan.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.engine.exec;
-
-import java.util.Iterator;
-import java.util.function.Supplier;
-import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
-import org.apache.ignite.internal.util.TransformingIterator;
-
-/**
- * TableFunctionScan.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
- */
-public class TableFunctionScan<RowT> implements Iterable<RowT> {
- private final Supplier<Iterable<Object[]>> dataSupplier;
-
- private final RowFactory<RowT> rowFactory;
-
- /**
- * Constructor.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
- */
- public TableFunctionScan(
- Supplier<Iterable<Object[]>> dataSupplier,
- RowFactory<RowT> rowFactory
- ) {
- this.dataSupplier = dataSupplier;
- this.rowFactory = rowFactory;
- }
-
- /** {@inheritDoc} */
- @Override
- public Iterator<RowT> iterator() {
- return new TransformingIterator<>(dataSupplier.get().iterator(),
rowFactory::create);
- }
-}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/IgniteSqlFunctions.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/IgniteSqlFunctions.java
index bdff76032c..7aa41bb3b1 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/IgniteSqlFunctions.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/IgniteSqlFunctions.java
@@ -37,20 +37,8 @@ import java.util.UUID;
import org.apache.calcite.DataContext;
import org.apache.calcite.avatica.util.ByteString;
import org.apache.calcite.avatica.util.DateTimeUtils;
-import org.apache.calcite.config.CalciteConnectionConfig;
-import org.apache.calcite.linq4j.AbstractEnumerable;
-import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.linq4j.Enumerator;
-import org.apache.calcite.linq4j.Linq4j;
import org.apache.calcite.linq4j.function.NonDeterministic;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.runtime.SqlFunctions;
-import org.apache.calcite.schema.ScannableTable;
-import org.apache.calcite.schema.Schema;
-import org.apache.calcite.schema.Statistic;
-import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.ignite.internal.lang.IgniteStringBuilder;
import org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable;
@@ -86,16 +74,6 @@ public class IgniteSqlFunctions {
// No-op.
}
- /** SQL SYSTEM_RANGE(start, end) table function. */
- public static ScannableTable systemRange(Object rangeStart, Object
rangeEnd) {
- return new RangeTable(rangeStart, rangeEnd, 1L);
- }
-
- /** SQL SYSTEM_RANGE(start, end, increment) table function. */
- public static ScannableTable systemRange(Object rangeStart, Object
rangeEnd, Object increment) {
- return new RangeTable(rangeStart, rangeEnd, increment);
- }
-
/** Just a stub. Validates Date\Time literal, still use calcite
implementation for numeric representation.
* Otherwise need to fix {@code DateTimeUtils#unixTimestampToString} usage
additionally.
*/
@@ -659,120 +637,6 @@ public class IgniteSqlFunctions {
}
}
- /**
- * Dummy table to implement the SYSTEM_RANGE function.
- */
- private static class RangeTable implements ScannableTable {
- /** Start of the range. */
- private final Object rangeStart;
-
- /** End of the range. */
- private final Object rangeEnd;
-
- /** Increment. */
- private final Object increment;
-
- /**
- * Note: {@code Object} arguments required here due to: 1. {@code
NULL} arguments need to be supported, so we
- * can't use {@code long} arguments type. 2. {@code Integer} and other
numeric classes can be converted to
- * {@code long} type by java, but can't be converted to {@code Long}
type, so we can't use {@code Long}
- * arguments type either. Instead, we accept {@code Object} arguments
type and try to convert valid types to
- * {@code long}.
- */
- RangeTable(Object rangeStart, Object rangeEnd, Object increment) {
- this.rangeStart = rangeStart;
- this.rangeEnd = rangeEnd;
- this.increment = increment;
- }
-
- /** {@inheritDoc} */
- @Override
- public RelDataType getRowType(RelDataTypeFactory typeFactory) {
- return typeFactory.builder().add("X", SqlTypeName.BIGINT).build();
- }
-
- /** {@inheritDoc} */
- @Override
- public Enumerable<@Nullable Object[]> scan(DataContext root) {
- if (rangeStart == null || rangeEnd == null || increment == null) {
- return Linq4j.emptyEnumerable();
- }
-
- long rangeStart = convertToLongArg(this.rangeStart, "rangeStart");
- long rangeEnd = convertToLongArg(this.rangeEnd, "rangeEnd");
- long increment = convertToLongArg(this.increment, "increment");
-
- if (increment == 0L) {
- throw new IllegalArgumentException("Increment can't be 0");
- }
-
- return new AbstractEnumerable<>() {
- @Override
- public Enumerator<@Nullable Object[]> enumerator() {
- return new Enumerator<>() {
- long cur = rangeStart - increment;
-
- @Override
- public Object[] current() {
- return new Object[]{cur};
- }
-
- @Override
- public boolean moveNext() {
- cur += increment;
-
- return increment > 0L ? cur <= rangeEnd : cur >=
rangeEnd;
- }
-
- @Override
- public void reset() {
- cur = rangeStart - increment;
- }
-
- @Override
- public void close() {
- // No-op.
- }
- };
- }
- };
- }
-
- private long convertToLongArg(Object val, String name) {
- if (val instanceof Byte || val instanceof Short || val instanceof
Integer || val instanceof Long) {
- return ((Number) val).longValue();
- }
-
- throw new IllegalArgumentException("Unsupported argument type
[arg=" + name
- + ", type=" + val.getClass().getSimpleName() + ']');
- }
-
- /** {@inheritDoc} */
- @Override
- public Statistic getStatistic() {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override
- public Schema.TableType getJdbcTableType() {
- return Schema.TableType.TABLE;
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean isRolledUp(String column) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean rolledUpColumnValidInsideAgg(String column, SqlCall
call,
- SqlNode parent, CalciteConnectionConfig cfg) {
- return true;
- }
- }
-
private static long divide(long p, long q, RoundingMode mode) {
// Stripped down version of guava's LongMath::divide.
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java
index 334446bbea..ef62472443 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java
@@ -333,7 +333,6 @@ import static
org.apache.calcite.sql.fun.SqlStdOperatorTable.UPPER;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.USER;
import static org.apache.calcite.util.ReflectUtil.isStatic;
import static
org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable.SUBSTR;
-import static
org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable.SYSTEM_RANGE;
import static
org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable.TYPEOF;
import static
org.apache.ignite.internal.sql.engine.util.IgniteMethod.GEN_RANDOM_UUID;
import static
org.apache.ignite.internal.sql.engine.util.IgniteMethod.GREATEST2;
@@ -1020,7 +1019,6 @@ public class RexImpTable {
defineMethod(IgniteSqlOperatorTable.DECIMAL_DIVIDE,
IgniteMethod.DECIMAL_DIVIDE.method(), NullPolicy.ARG0);
map.put(TYPEOF, systemFunctionImplementor);
- map.put(SYSTEM_RANGE, systemFunctionImplementor);
return this;
}
@@ -1034,17 +1032,7 @@ public class RexImpTable {
Expression implementSafe(final RexToLixTranslator translator,
final RexCall call, final List<Expression> argValueList) {
final SqlOperator op = call.getOperator();
- if (op == SYSTEM_RANGE) {
- if (call.getOperands().size() == 2) {
- return
createTableFunctionImplementor(IgniteMethod.SYSTEM_RANGE2.method())
- .implement(translator, call, NullAs.NULL);
- }
-
- if (call.getOperands().size() == 3) {
- return
createTableFunctionImplementor(IgniteMethod.SYSTEM_RANGE3.method())
- .implement(translator, call, NullAs.NULL);
- }
- } else if (op == TYPEOF) {
+ if (op == TYPEOF) {
if (call.getOperands().size() == 1) {
CallImplementor implementor = createTypeOfImplementor();
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/IterableTableFunction.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/IterableTableFunction.java
new file mode 100644
index 0000000000..3e38287fd6
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/IterableTableFunction.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp.func;
+
+import java.util.Iterator;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+
+/**
+ * {@link TableFunction} from an arbitrary iterable.
+ *
+ * @param <RowT> Row type.
+ */
+public final class IterableTableFunction<RowT> implements TableFunction<RowT> {
+
+ private final Iterable<RowT> src;
+
+ /** Constructor. */
+ public IterableTableFunction(Iterable<RowT> src) {
+ this.src = src;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public TableFunctionInstance<RowT> createInstance(ExecutionContext<RowT>
ctx) {
+ return new TableFunctionInstance<>() {
+
+ private final Iterator<RowT> it = src.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ @Override
+ public RowT next() {
+ return it.next();
+ }
+
+ @Override
+ public void close() {
+
+ }
+ };
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/SystemRangeTableFunction.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/SystemRangeTableFunction.java
new file mode 100644
index 0000000000..908ab5cab9
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/SystemRangeTableFunction.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp.func;
+
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
+import org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.jetbrains.annotations.Nullable;
+
+/** Implementation of {@link IgniteSqlOperatorTable#SYSTEM_RANGE system range
function}. */
+public final class SystemRangeTableFunction<RowT> implements
TableFunction<RowT> {
+
+ private final RowSchema rowSchema = RowSchema.builder()
+ .addField(NativeTypes.INT64)
+ .build();
+
+ private final Supplier<Long> startExpr;
+
+ private final Supplier<Long> endExpr;
+
+ private final Supplier<Long> incrementExpr;
+
+ /** Constructor. */
+ public SystemRangeTableFunction(Supplier<Long> startExpr, Supplier<Long>
endExpr, @Nullable Supplier<Long> incrementExpr) {
+ this.startExpr = Objects.requireNonNull(startExpr, "startExpr");
+ this.endExpr = Objects.requireNonNull(endExpr, "endExpr");
+ this.incrementExpr = incrementExpr != null ? incrementExpr : () -> 1L;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public TableFunctionInstance<RowT> createInstance(ExecutionContext<RowT>
ctx) {
+ RowFactory<RowT> factory = ctx.rowHandler().factory(rowSchema);
+
+ Long start = startExpr.get();
+ Long end = endExpr.get();
+ Long increment = incrementExpr.get();
+
+ if (increment == null) {
+ increment = 1L;
+ } else if (increment == 0) {
+ throw new IllegalArgumentException("Increment can't be 0");
+ }
+
+ if (start == null || end == null) {
+ return TableFunctionInstance.empty();
+ } else {
+ return new SystemRangeInstance<>(factory, start, end, increment);
+ }
+ }
+
+ private static class SystemRangeInstance<RowT> implements
TableFunctionInstance<RowT> {
+
+ private final long end;
+
+ private final long increment;
+
+ private long current;
+
+ private final RowFactory<RowT> factory;
+
+ SystemRangeInstance(RowFactory<RowT> factory, long start, long end,
long increment) {
+ this.factory = factory;
+ this.end = end;
+ this.increment = increment;
+ this.current = start;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return increment > 0 ? current <= end : current >= end;
+ }
+
+ @Override
+ public RowT next() {
+ if (increment > 0 && current > end) {
+ throw new NoSuchElementException();
+ } else if (increment < 0 && current < end) {
+ throw new NoSuchElementException();
+ }
+
+ RowT row = factory.create(current);
+
+ current += increment;
+
+ return row;
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunction.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunction.java
new file mode 100644
index 0000000000..cc1d53cf56
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp.func;
+
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+
+/**
+ * Table function implementation.
+ *
+ * @param <RowT> Row type.
+ */
+public interface TableFunction<RowT> {
+
+ /** Creates an instance of a table function. */
+ TableFunctionInstance<RowT> createInstance(ExecutionContext<RowT> ctx);
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunctionInstance.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunctionInstance.java
new file mode 100644
index 0000000000..b3d9af9e5b
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunctionInstance.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp.func;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Represents an instance of a table function.
+ *
+ * @param <RowT> Row type.
+ */
+public interface TableFunctionInstance<RowT> extends Iterator<RowT>,
AutoCloseable {
+
+ /** Returns a table function instance that always produces no results. */
+ static <RowT> TableFunctionInstance<RowT> empty() {
+ return new TableFunctionInstance<>() {
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public RowT next() {
+ throw new NoSuchElementException();
+ }
+ };
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunctionRegistry.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunctionRegistry.java
new file mode 100644
index 0000000000..7edf0309b9
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunctionRegistry.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp.func;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+
+/** Registry of available table functions. */
+public interface TableFunctionRegistry {
+
+ /**
+ * Returns implementation of the given table function call.
+ *
+ * @param <RowT> Row type.
+ * @param ctx Context.
+ * @param rexCall Table function call.
+ * @return Table function implementation.
+ */
+ <RowT> TableFunction<RowT> getTableFunction(ExecutionContext<RowT> ctx,
RexCall rexCall);
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunctionRegistryImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunctionRegistryImpl.java
new file mode 100644
index 0000000000..4f6edc75b0
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunctionRegistryImpl.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp.func;
+
+import java.util.function.Supplier;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactory;
+import org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable;
+import org.jetbrains.annotations.Nullable;
+
+/** Implementation of {@link TableFunctionRegistry}. */
+public class TableFunctionRegistryImpl implements TableFunctionRegistry {
+
+ /** {@inheritDoc} */
+ @Override
+ public <RowT> TableFunction<RowT> getTableFunction(ExecutionContext<RowT>
ctx, RexCall rexCall) {
+ if (rexCall.getOperator() == IgniteSqlOperatorTable.SYSTEM_RANGE) {
+ Supplier<Long> start =
implementGetLongExpr(ctx.expressionFactory(), rexCall.operands.get(0));
+ Supplier<Long> end = implementGetLongExpr(ctx.expressionFactory(),
rexCall.operands.get(1));
+ Supplier<Long> increment;
+
+ if (rexCall.operands.size() > 2) {
+ increment = implementGetLongExpr(ctx.expressionFactory(),
rexCall.operands.get(2));
+ } else {
+ increment = null;
+ }
+
+ return new SystemRangeTableFunction<>(start, end, increment);
+ } else {
+ throw new IllegalArgumentException("Unsupported table function: "
+ rexCall.getOperator());
+ }
+ }
+
+ private static <RowT> @Nullable Supplier<Long>
implementGetLongExpr(ExpressionFactory<RowT> expressionFactory, RexNode expr) {
+ if (expr == null) {
+ return null;
+ }
+
+ Supplier<Object> value = expressionFactory.execute(expr);
+ return () -> {
+ Number num = (Number) value.get();
+ if (num == null) {
+ return null;
+ }
+ return num.longValue();
+ };
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java
index 59c8b607b3..f8a2a3e660 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java
@@ -17,34 +17,47 @@
package org.apache.ignite.internal.sql.engine.exec.rel;
-import java.util.Iterator;
import java.util.List;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import
org.apache.ignite.internal.sql.engine.exec.exp.func.IterableTableFunction;
+import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunction;
+import
org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionInstance;
import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.lang.ErrorGroups.Sql;
+import org.apache.ignite.sql.SqlException;
/**
* Scan node.
*/
public class ScanNode<RowT> extends AbstractNode<RowT> implements
SingleNode<RowT> {
- private final Iterable<RowT> src;
+ private final TableFunction<RowT> func;
- private Iterator<RowT> it;
+ private TableFunctionInstance<RowT> inst;
private int requested;
private boolean inLoop;
/**
- * Constructor.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Constructor for a scan that returns rows from the given iterable.
*
* @param ctx Execution context.
- * @param src Source.
+ * @param src Source iterable.
*/
public ScanNode(ExecutionContext<RowT> ctx, Iterable<RowT> src) {
+ this(ctx, new IterableTableFunction<>(src));
+ }
+
+ /**
+ * Constructor for a scan that returns rows produced by the given table
function.
+ *
+ * @param ctx Execution context.
+ * @param src Table function.
+ */
+ public ScanNode(ExecutionContext<RowT> ctx, TableFunction<RowT> src) {
super(ctx);
- this.src = src;
+ this.func = src;
}
/** {@inheritDoc} */
@@ -66,16 +79,16 @@ public class ScanNode<RowT> extends AbstractNode<RowT>
implements SingleNode<Row
public void closeInternal() {
super.closeInternal();
- Commons.closeQuiet(it);
- it = null;
- Commons.closeQuiet(src);
+ Commons.closeQuiet(inst);
+ inst = null;
+ Commons.closeQuiet(func);
}
/** {@inheritDoc} */
@Override
protected void rewindInternal() {
- Commons.closeQuiet(it);
- it = null;
+ Commons.closeQuiet(inst);
+ inst = null;
requested = 0;
}
@@ -100,16 +113,16 @@ public class ScanNode<RowT> extends AbstractNode<RowT>
implements SingleNode<Row
inLoop = true;
try {
- if (it == null) {
- it = src.iterator();
+ if (inst == null) {
+ inst = func.createInstance(context());
}
int processed = 0;
- while (requested > 0 && it.hasNext()) {
+ while (requested > 0 && inst.hasNext()) {
checkState();
requested--;
- downstream().push(it.next());
+ downstream().push(inst.next());
if (++processed == inBufSize && requested > 0) {
// allow others to do their job
@@ -118,13 +131,15 @@ public class ScanNode<RowT> extends AbstractNode<RowT>
implements SingleNode<Row
return;
}
}
+ } catch (Exception e) {
+ throw new SqlException(Sql.RUNTIME_ERR, e);
} finally {
inLoop = false;
}
- if (requested > 0 && !it.hasNext()) {
- Commons.closeQuiet(it);
- it = null;
+ if (requested > 0 && !inst.hasNext()) {
+ Commons.closeQuiet(inst);
+ inst = null;
requested = 0;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
index 926509f62d..6ed1ff9d43 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
@@ -68,10 +68,6 @@ public enum IgniteMethod {
/** See {@link BiScalar#execute(ExecutionContext, Object, Object,
RowBuilder)}. */
BI_SCALAR_EXECUTE(BiScalar.class, "execute", ExecutionContext.class,
Object.class, Object.class, RowBuilder.class),
- SYSTEM_RANGE2(IgniteSqlFunctions.class, "systemRange", Object.class,
Object.class),
-
- SYSTEM_RANGE3(IgniteSqlFunctions.class, "systemRange", Object.class,
Object.class, Object.class),
-
STRING_TO_TIMESTAMP(IgniteSqlFunctions.class, "timestampStringToNumeric",
String.class),
/** See {@link IgniteSqlFunctions#subtractTimeZoneOffset(long, TimeZone)}.
**/
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 5c5e170041..98a3de4d51 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -92,6 +92,8 @@ import
org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import
org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.TestCluster.TestNode;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
+import
org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistry;
+import
org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistryImpl;
import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider;
@@ -902,6 +904,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
var partitionPruner = new PartitionPrunerImpl();
var mappingService = new MappingServiceImpl(nodeName, targetProvider,
EmptyCacheFactory.INSTANCE, 0, partitionPruner, taskExecutor);
+ var tableFunctionRegistry = new TableFunctionRegistryImpl();
List<LogicalNode> logicalNodes = nodeNames.stream()
.map(name -> new LogicalNode(name, name,
NetworkAddress.from("127.0.0.1:10000")))
@@ -919,7 +922,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
ArrayRowHandler.INSTANCE,
executableTableRegistry,
dependencyResolver,
- (ctx, deps) -> node.implementor(ctx, mailboxRegistry,
exchangeService, deps),
+ (ctx, deps) -> node.implementor(ctx, mailboxRegistry,
exchangeService, deps, tableFunctionRegistry),
clockService,
SHUTDOWN_TIMEOUT
);
@@ -1090,8 +1093,10 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
ExecutionContext<Object[]> ctx,
MailboxRegistry mailboxRegistry,
ExchangeService exchangeService,
- ResolvedDependencies deps) {
- return new LogicalRelImplementor<>(ctx, mailboxRegistry,
exchangeService, deps) {
+ ResolvedDependencies deps,
+ TableFunctionRegistry tableFunctionRegistry
+ ) {
+ return new LogicalRelImplementor<>(ctx, mailboxRegistry,
exchangeService, deps, tableFunctionRegistry) {
@Override
public Node<Object[]> visit(IgniteTableScan rel) {
return new ScanNode<>(ctx, dataset) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNodeExecutionTest.java
new file mode 100644
index 0000000000..1f95f13d66
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNodeExecutionTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunction;
+import
org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionInstance;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link ScanNode}.
+ */
+public class ScanNodeExecutionTest extends AbstractExecutionTest<Object[]> {
+
+ @Test
+ public void testIterableSource() {
+ ExecutionContext<Object[]> ctx = executionContext(true);
+
+ List<Object[]> data = IntStream.range(0, 5)
+ .mapToObj(i -> new Object[]{i})
+ .collect(Collectors.toList());
+
+ RootNode<Object[]> rootNode = new RootNode<>(ctx);
+ ScanNode<Object[]> srcNode = new ScanNode<>(ctx, data);
+
+ rootNode.register(srcNode);
+
+ collectResults(rootNode, data);
+ }
+
+ @Test
+ public void testTableFunctionSource() {
+ ExecutionContext<Object[]> ctx = executionContext(true);
+
+ List<Object[]> data = IntStream.range(0, 5)
+ .mapToObj(i -> new Object[]{i})
+ .collect(Collectors.toList());
+
+ TestFunctionInstance<Object[]> instance = new
TestFunctionInstance<>(data.iterator());
+ TestFunction<Object[]> testFunction = new TestFunction<>(instance);
+
+ try (RootNode<Object[]> rootNode = new RootNode<>(ctx)) {
+ ScanNode<Object[]> srcNode = new ScanNode<>(ctx, testFunction);
+
+ rootNode.register(srcNode);
+
+ collectResults(rootNode, data);
+
+ assertEquals(1, instance.closeCounter.get());
+ }
+ }
+
+ @Override
+ protected RowHandler<Object[]> rowHandler() {
+ return ArrayRowHandler.INSTANCE;
+ }
+
+ private static void collectResults(RootNode<Object[]> rootNode,
List<Object[]> data) {
+ List<Object[]> actual = new ArrayList<>();
+
+ while (rootNode.hasNext()) {
+ actual.add(rootNode.next());
+ }
+
+ assertEquals(
+
actual.stream().map(Arrays::asList).collect(Collectors.toList()),
+ data.stream().map(Arrays::asList).collect(Collectors.toList())
+ );
+ }
+
+ private static class TestFunction<RowT> implements TableFunction<RowT> {
+
+ private final TableFunctionInstance<RowT> instance;
+
+ private TestFunction(TableFunctionInstance<RowT> instance) {
+ this.instance = instance;
+ }
+
+ @Override
+ public TableFunctionInstance<RowT>
createInstance(ExecutionContext<RowT> ctx) {
+ return instance;
+ }
+ }
+
+ private static class TestFunctionInstance<RowT> implements
TableFunctionInstance<RowT> {
+ final Iterator<RowT> it;
+
+ final AtomicInteger closeCounter = new AtomicInteger();
+
+ private TestFunctionInstance(Iterator<RowT> it) {
+ this.it = it;
+ }
+
+ @Override
+ public void close() {
+ closeCounter.incrementAndGet();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ @Override
+ public RowT next() {
+ return it.next();
+ }
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SystemRangeTableFunctionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SystemRangeTableFunctionTest.java
new file mode 100644
index 0000000000..519d8fa4b6
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SystemRangeTableFunctionTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import
org.apache.ignite.internal.sql.engine.exec.exp.func.SystemRangeTableFunction;
+import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunction;
+import
org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionInstance;
+import
org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistry;
+import
org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistryImpl;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
+import org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.junit.jupiter.api.Test;
+
+/** Tests for {@link SystemRangeTableFunction}. */
+public class SystemRangeTableFunctionTest extends
AbstractExecutionTest<Object[]> {
+
+ private final TableFunctionRegistry registry = new
TableFunctionRegistryImpl();
+
+ private final RexBuilder rexBuilder = Commons.rexBuilder();
+
+ @Test
+ public void testSystemRangeNoIncrement() throws Exception {
+ RexCall call = (RexCall)
rexBuilder.makeCall(IgniteSqlOperatorTable.SYSTEM_RANGE,
+ rexBuilder.makeBigintLiteral(BigDecimal.ONE),
+ rexBuilder.makeBigintLiteral(BigDecimal.TEN)
+ );
+
+ List<Object[]> expected = IntStream.rangeClosed(1, 10)
+ .mapToObj(i -> new Object[]{i})
+ .collect(Collectors.toList());
+
+ checkFunction(call, expected);
+ }
+
+ @Test
+ public void testSystemRangeWithIncrement() throws Exception {
+ RexCall call = (RexCall)
rexBuilder.makeCall(IgniteSqlOperatorTable.SYSTEM_RANGE,
+ rexBuilder.makeBigintLiteral(BigDecimal.ONE),
+ rexBuilder.makeBigintLiteral(BigDecimal.TEN),
+
rexBuilder.makeBigintLiteral(BigDecimal.ONE.add(BigDecimal.ONE))
+ );
+
+ List<Object[]> expected = IntStream.rangeClosed(1, 10)
+ .filter(i -> i % 2 == 1)
+ .mapToObj(i -> new Object[]{i})
+ .collect(Collectors.toList());
+
+ checkFunction(call, expected);
+ }
+
+ @Test
+ public void testSystemRangeReverse() throws Exception {
+ RexCall call = (RexCall)
rexBuilder.makeCall(IgniteSqlOperatorTable.SYSTEM_RANGE,
+ rexBuilder.makeBigintLiteral(BigDecimal.TEN),
+ rexBuilder.makeBigintLiteral(BigDecimal.ONE),
+ rexBuilder.makeBigintLiteral(BigDecimal.ONE.negate())
+ );
+
+ List<Object[]> expected = IntStream.rangeClosed(1, 10)
+ .boxed()
+ .sorted(Comparator.reverseOrder())
+ .map(i -> new Object[]{i})
+ .collect(Collectors.toList());
+
+ checkFunction(call, expected);
+ }
+
+ private void checkFunction(RexCall call, List<Object[]> expected) throws
Exception {
+ ExecutionContext<Object[]> executionContext = executionContext();
+ TableFunction<Object[]> tableFunction =
registry.getTableFunction(executionContext, call);
+
+ try (TableFunctionInstance<Object[]> instance =
tableFunction.createInstance(executionContext)) {
+ List<Object[]> actual = new ArrayList<>();
+ while (instance.hasNext()) {
+ actual.add(instance.next());
+ }
+
+ assertEquals(expected.size(), actual.size());
+ }
+ }
+
+ @Override
+ protected RowHandler<Object[]> rowHandler() {
+ return ArrayRowHandler.INSTANCE;
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
index 130f5c7e92..77652b2df4 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -55,6 +55,7 @@ import
org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
+import
org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistryImpl;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappingService;
import org.apache.ignite.internal.sql.engine.message.MessageService;
import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
@@ -138,6 +139,8 @@ public class TestNode implements LifecycleAware {
tableRegistry, view -> () ->
systemViewManager.scanView(view.name())
);
+ TableFunctionRegistryImpl tableFunctionRegistry = new
TableFunctionRegistryImpl();
+
executionService = registerService(ExecutionServiceImpl.create(
topologyService,
messageService,
@@ -150,6 +153,7 @@ public class TestNode implements LifecycleAware {
mappingService,
tableRegistry,
dependencyResolver,
+ tableFunctionRegistry,
clockService,
5_000
));