This is an automated email from the ASF dual-hosted git repository. chenyz pushed a commit to branch udtf in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b9dd299cbe8799ced4f0314dd0c3f682e552f323 Author: Chen YZ <[email protected]> AuthorDate: Wed Feb 5 18:06:09 2025 +0800 adjust api --- .../iotdb/udf/api/relational/TableFunction.java | 51 ++++++- .../relational/table/TableFunctionAnalysis.java | 4 +- .../api/relational/table/argument/Descriptor.java | 95 ------------- .../table/argument/DescriptorArgument.java | 57 -------- .../processor/TableFunctionDataProcessor.java | 32 +++-- .../processor/TableFunctionLeafProcessor.java | 5 + .../DescriptorParameterSpecification.java | 57 -------- .../specification/ReturnTypeSpecification.java | 68 --------- .../specification/TableParameterSpecification.java | 9 -- .../function/table/ExcludeColumnFunction.java | 27 +--- .../execution/function/table/HOPTableFunction.java | 47 ++----- .../execution/function/table/SplitFunction.java | 2 +- .../process/function/PartitionRecognizer.java | 124 +++-------------- .../process/function/TableFunctionOperator.java | 73 ++++++++-- .../process/function/partition/PartitionState.java | 22 ++- .../operator/process/function/partition/Slice.java | 153 +++++++++++++++++++++ .../process/function/partition/SliceCache.java | 55 ++++++++ .../plan/planner/TableOperatorGenerator.java | 7 + .../plan/planner/plan/node/PlanGraphPrinter.java | 22 +-- .../relational/analyzer/StatementAnalyzer.java | 108 ++++----------- .../plan/relational/sql/ast/AstVisitor.java | 13 -- .../plan/relational/sql/ast/Descriptor.java | 80 ----------- .../plan/relational/sql/ast/DescriptorField.java | 88 ------------ .../relational/sql/ast/TableFunctionArgument.java | 5 +- .../sql/ast/TableFunctionDescriptorArgument.java | 87 ------------ .../plan/relational/sql/parser/AstBuilder.java | 24 ---- .../plan/relational/sql/util/SqlFormatter.java | 23 ---- .../process/tvf/TableFunctionOperatorTest.java | 57 ++++---- .../db/relational/grammar/sql/RelationalSql.g4 | 11 +- 29 files changed, 466 insertions(+), 940 deletions(-) diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/TableFunction.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/TableFunction.java index fb0e89f9557..cf74cbd748f 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/TableFunction.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/TableFunction.java @@ -23,16 +23,65 @@ import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; +import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument; +import org.apache.iotdb.udf.api.relational.table.argument.TableArgument; import org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification; +import org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification; +import org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification; import java.util.List; import java.util.Map; public interface TableFunction { - List<ParameterSpecification> getArgumentsSpecification(); + /** + * This method is used to define the specification of the arguments required by the table + * function. Each argument is described using a {@link ParameterSpecification} object, which + * encapsulates details such as the argument name, whether it is required, its default value (if + * any), etc. + * + * <p>The {@link ParameterSpecification} class is abstract and has two concrete implementations: + * + * <ul> + * <li>{@link TableParameterSpecification}: Used for parameters specific to table functions. + * <li>{@link ScalarParameterSpecification}: Used for parameters specific to scalar functions. + * </ul> + * + * @return a list of {@link ParameterSpecification} objects describing the function's arguments. + * The list should include all parameters that the table function expects, along with their + * constraints and default values (if applicable). + */ + List<ParameterSpecification> getArgumentsSpecifications(); + /** + * This method is responsible for analyzing the provided arguments and constructing a {@link + * TableFunctionAnalysis} object in runtime. During analysis, the method should: + * + * <ul> + * <li>Extract values from scalar arguments (instances of {@link ScalarArgument}) or extract + * table descriptions from table arguments (instances of {@link TableArgument}). + * <li>Construct a {@link TableFunctionAnalysis} object that contains: + * <ul> + * <li>A description of proper columns. + * <li>A map indicating which columns from the input table arguments are required for the + * function to execute. + * </ul> + * </ul> + * + * @param arguments a map of argument names to their corresponding {@link Argument} values. The + * keys should match the parameter names defined in {@link #getArgumentsSpecifications()}. + * @throws UDFException if any argument is invalid or if an error occurs during analysis + * @return a {@link TableFunctionAnalysis} object containing the analysis result + */ TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws UDFException; + /** + * This method is used to obtain a {@link TableFunctionProcessorProvider} that will be responsible + * for creating processors to handle the transformation of input data into output table. The + * provider is initialized with the validated arguments. + * + * @param arguments a map of argument names to their corresponding {@link Argument} values + * @return a {@link TableFunctionProcessorProvider} for creating processors + */ TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments); } diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionAnalysis.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionAnalysis.java index d535518cbbd..b33eb8c8ac7 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionAnalysis.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionAnalysis.java @@ -35,8 +35,8 @@ import static java.util.Objects.requireNonNull; public class TableFunctionAnalysis { /** - * The `returnedSchema` field is used to inform the Analyzer of the proper columns returned by the - * Table Function, that is, the columns produced by the function. + * The `properColumnSchema` field is used to inform the Analyzer of the proper columns returned by + * the Table Function, that is, the columns produced by the function. */ private final Optional<DescribedSchema> properColumnSchema; diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/argument/Descriptor.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/argument/Descriptor.java deleted file mode 100644 index ae9624f8f0b..00000000000 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/argument/Descriptor.java +++ /dev/null @@ -1,95 +0,0 @@ -package org.apache.iotdb.udf.api.relational.table.argument; - -import org.apache.iotdb.udf.api.type.Type; - -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.Optional; - -import static java.util.Objects.requireNonNull; - -public class Descriptor { - private final List<Field> fields; - - public Descriptor(List<Field> fields) { - requireNonNull(fields, "fields is null"); - if (fields.isEmpty()) { - throw new IllegalArgumentException("descriptor has no fields"); - } - this.fields = fields; - } - - public static Descriptor descriptor(List<String> names, List<Type> types) { - requireNonNull(names, "names is null"); - requireNonNull(types, "types is null"); - if (names.size() != types.size()) { - throw new IllegalArgumentException("names and types lists do not match"); - } - List<Field> fields = new ArrayList<>(); - for (int i = 0; i < names.size(); i++) { - fields.add(new Field(names.get(i), Optional.of(types.get(i)))); - } - return new Descriptor(fields); - } - - public List<Field> getFields() { - return fields; - } - - public boolean isTyped() { - return fields.stream().allMatch(field -> field.type.isPresent()); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Descriptor that = (Descriptor) o; - return fields.equals(that.fields); - } - - @Override - public int hashCode() { - return Objects.hash(fields); - } - - public static class Field { - private final String name; - private final Optional<Type> type; - - public Field(String name, Optional<Type> type) { - this.name = name; - this.type = type; - } - - public String getName() { - return name; - } - - public Optional<Type> getType() { - return type; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Field field = (Field) o; - return name.equals(field.name) && type.equals(field.type); - } - - @Override - public int hashCode() { - return Objects.hash(name, type); - } - } -} diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/argument/DescriptorArgument.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/argument/DescriptorArgument.java deleted file mode 100644 index 30127511df1..00000000000 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/argument/DescriptorArgument.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.udf.api.relational.table.argument; - -import java.util.Objects; -import java.util.Optional; - -import static java.util.Objects.requireNonNull; - -public class DescriptorArgument implements Argument { - public static final DescriptorArgument NULL_DESCRIPTOR = new DescriptorArgument(Optional.empty()); - - // it is optional because it is possible that the descriptor is not present(null descriptor) - private final Optional<Descriptor> descriptor; - - public DescriptorArgument(Optional<Descriptor> descriptor) { - this.descriptor = requireNonNull(descriptor, "descriptor is null"); - } - - public Optional<Descriptor> getDescriptor() { - return descriptor; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - DescriptorArgument that = (DescriptorArgument) o; - return descriptor.equals(that.descriptor); - } - - @Override - public int hashCode() { - return Objects.hash(descriptor); - } -} diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionDataProcessor.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionDataProcessor.java index d238d0abcb1..c3cd5483b64 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionDataProcessor.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionDataProcessor.java @@ -19,7 +19,6 @@ package org.apache.iotdb.udf.api.relational.table.processor; -import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; @@ -30,23 +29,36 @@ import java.util.List; /** Each instance of TableFunctionDataProcessor processes one partition of data. */ public interface TableFunctionDataProcessor { + default void beforeStart() { + // do nothing + } + /** * This method processes a portion of data. It is called multiple times until the partition is * fully processed. * * @param input {@link Record} including a portion of one partition for each table function's - * input table. Input is ordered according to the corresponding argument specifications in - * {@link TableFunction}. A page for an argument consists of columns requested during analysis - * (see {@link TableFunctionAnalysis#getRequiredColumns}). - * @param columnBuilders a list of {@link ColumnBuilder} for each column in the output table. + * input table. A Record consists of columns requested during analysis (see {@link + * TableFunctionAnalysis#getRequiredColumns}). + * @param properColumnBuilders A list of {@link ColumnBuilder} for each column in the output + * table. + * @param passThroughIndexBuilder A {@link ColumnBuilder} for pass through columns. Index is + * started from 0 of the whole partition. */ - void process(Record input, List<ColumnBuilder> columnBuilders); + void process( + Record input, + List<ColumnBuilder> properColumnBuilders, + ColumnBuilder passThroughIndexBuilder); /** - * This method is called after all data is processed. It is used to finalize the output table. All - * remaining data should be written to the columnBuilders. + * This method is called after all data is processed. It is used to finalize the output table and + * close resource. All remaining data should be written to the columnBuilders. * - * @param columnBuilders a list of {@link ColumnBuilder} for each column in the output table. + * @param columnBuilders A list of {@link ColumnBuilder} for each column in the output table. + * @param passThroughIndexBuilder A {@link ColumnBuilder} for pass through columns. Index is + * started from 0 of the whole partition. */ - void finish(List<ColumnBuilder> columnBuilders); + default void finish(List<ColumnBuilder> columnBuilders, ColumnBuilder passThroughIndexBuilder) { + // do nothing + } } diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionLeafProcessor.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionLeafProcessor.java index a8a49af3fa8..41750fc3b53 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionLeafProcessor.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionLeafProcessor.java @@ -24,6 +24,11 @@ import org.apache.tsfile.block.column.ColumnBuilder; import java.util.List; public interface TableFunctionLeafProcessor { + + default void beforeStart() { + // do nothing + } + /** * This method processes a portion of data. It is called multiple times until the processor is * fully processed. diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/specification/DescriptorParameterSpecification.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/specification/DescriptorParameterSpecification.java deleted file mode 100644 index 06a0fcdee62..00000000000 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/specification/DescriptorParameterSpecification.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.udf.api.relational.table.specification; - -import org.apache.iotdb.udf.api.relational.table.argument.Descriptor; - -import java.util.Optional; - -public class DescriptorParameterSpecification extends ParameterSpecification { - private DescriptorParameterSpecification(String name, boolean required, Descriptor defaultValue) { - super(name, required, Optional.ofNullable(defaultValue)); - } - - public static Builder builder() { - return new Builder(); - } - - public static final class Builder { - private String name; - private boolean required = true; - private Descriptor defaultValue; - - private Builder() {} - - public Builder name(String name) { - this.name = name; - return this; - } - - public Builder defaultValue(Descriptor defaultValue) { - this.required = false; - this.defaultValue = defaultValue; - return this; - } - - public DescriptorParameterSpecification build() { - return new DescriptorParameterSpecification(name, required, defaultValue); - } - } -} diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/specification/ReturnTypeSpecification.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/specification/ReturnTypeSpecification.java deleted file mode 100644 index 03e4f7fa7da..00000000000 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/specification/ReturnTypeSpecification.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.udf.api.relational.table.specification; - -import org.apache.iotdb.udf.api.relational.table.argument.Descriptor; - -import static java.util.Objects.requireNonNull; - -/** - * The return type declaration refers to the proper columns of the table function. These are the - * columns produced by the table function as opposed to the columns of input relations passed - * through by the table function. - */ -public abstract class ReturnTypeSpecification { - /** - * The proper columns of the table function are not known at function declaration time. They must - * be determined at query analysis time based on the actual call arguments. - */ - public static class GenericTable extends ReturnTypeSpecification { - public static final GenericTable GENERIC_TABLE = new GenericTable(); - - private GenericTable() {} - } - - /** The table function has no proper columns. */ - public static class OnlyPassThrough extends ReturnTypeSpecification { - public static final OnlyPassThrough ONLY_PASS_THROUGH = new OnlyPassThrough(); - - private OnlyPassThrough() {} - } - - /** - * The proper columns of the table function are known at function declaration time. They do not - * depend on the actual call arguments. - */ - public static class DescribedTable extends ReturnTypeSpecification { - private final Descriptor descriptor; - - public DescribedTable(Descriptor descriptor) { - requireNonNull(descriptor, "descriptor is null"); - if (!descriptor.isTyped()) { - throw new IllegalArgumentException("field types not specified"); - } - this.descriptor = descriptor; - } - - public Descriptor getDescriptor() { - return descriptor; - } - } -} diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/specification/TableParameterSpecification.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/specification/TableParameterSpecification.java index 7348ec54355..eabd6d61a5b 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/specification/TableParameterSpecification.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/specification/TableParameterSpecification.java @@ -32,10 +32,6 @@ public class TableParameterSpecification extends ParameterSpecification { String name, boolean rowSemantics, boolean pruneWhenEmpty, boolean passThroughColumns) { // table arguments are always required super(name, true, Optional.empty()); - // if (rowSemantics && !pruneWhenEmpty) { - // throw new IllegalArgumentException( - // "Cannot set the KEEP WHEN EMPTY property for a table argument with row semantics"); - // } this.rowSemantics = rowSemantics; this.pruneWhenEmpty = pruneWhenEmpty; this.passThroughColumns = passThroughColumns; @@ -80,11 +76,6 @@ public class TableParameterSpecification extends ParameterSpecification { return this; } - public Builder pruneWhenEmpty() { - this.pruneWhenEmpty = true; - return this; - } - public Builder keepWhenEmpty() { this.pruneWhenEmpty = false; return this; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/ExcludeColumnFunction.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/ExcludeColumnFunction.java index e01428e4736..63b727e836c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/ExcludeColumnFunction.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/ExcludeColumnFunction.java @@ -21,7 +21,6 @@ package org.apache.iotdb.db.queryengine.execution.function.table; import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; -import org.apache.iotdb.udf.api.relational.access.Record; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; @@ -35,7 +34,6 @@ import org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpe import org.apache.iotdb.udf.api.type.Type; import com.google.common.collect.ImmutableList; -import org.apache.tsfile.block.column.ColumnBuilder; import java.util.Arrays; import java.util.List; @@ -47,7 +45,7 @@ public class ExcludeColumnFunction implements TableFunction { private final String COL_PARAM = "EXCLUDE"; @Override - public List<ParameterSpecification> getArgumentsSpecification() { + public List<ParameterSpecification> getArgumentsSpecifications() { return Arrays.asList( TableParameterSpecification.builder().name(TBL_PARAM).rowSemantics().build(), ScalarParameterSpecification.builder().name(COL_PARAM).type(Type.STRING).build()); @@ -56,9 +54,6 @@ public class ExcludeColumnFunction implements TableFunction { @Override public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws UDFException { TableArgument tableArgument = (TableArgument) arguments.get(TBL_PARAM); - if (tableArgument == null) { - throw new UDFException("Table argument is missing"); - } String excludeColumn = (String) ((ScalarArgument) arguments.get(COL_PARAM)).getValue(); ImmutableList.Builder<Integer> requiredColumns = ImmutableList.builder(); DescribedSchema.Builder schemaBuilder = DescribedSchema.builder(); @@ -80,22 +75,14 @@ public class ExcludeColumnFunction implements TableFunction { return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { - return new TableFunctionDataProcessor() { - @Override - public void process(Record input, List<ColumnBuilder> columnBuilders) { - for (int i = 0; i < input.size(); i++) { - if (input.isNull(i)) { - columnBuilders.get(i).appendNull(); - } else { - columnBuilders.get(i).writeObject(input.getObject(i)); - } + return (input, properColumnBuilders, passThroughIndexBuilder) -> { + for (int i = 0; i < input.size(); i++) { + if (input.isNull(i)) { + properColumnBuilders.get(i).appendNull(); + } else { + properColumnBuilders.get(i).writeObject(input.getObject(i)); } } - - @Override - public void finish(List<ColumnBuilder> columnBuilders) { - // do nothing - } }; } }; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/HOPTableFunction.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/HOPTableFunction.java index 4da85a53f12..470e4aa987b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/HOPTableFunction.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/HOPTableFunction.java @@ -41,8 +41,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; -import java.util.stream.IntStream; public class HOPTableFunction implements TableFunction { @@ -53,7 +51,7 @@ public class HOPTableFunction implements TableFunction { private static final String START_PARAMETER_NAME = "START"; @Override - public List<ParameterSpecification> getArgumentsSpecification() { + public List<ParameterSpecification> getArgumentsSpecifications() { return Arrays.asList( TableParameterSpecification.builder() .name(DATA_PARAMETER_NAME) @@ -85,7 +83,6 @@ public class HOPTableFunction implements TableFunction { return requiredIndex; } - // TODO: ImmutableMap @Override public TableFunctionAnalysis analyze(Map<String, Argument> arguments) { TableArgument tableArgument = (TableArgument) arguments.get(DATA_PARAMETER_NAME); @@ -104,29 +101,19 @@ public class HOPTableFunction implements TableFunction { // outputColumnSchema return TableFunctionAnalysis.builder() .properColumnSchema(properColumnSchema) - .requiredColumns( - DATA_PARAMETER_NAME, - IntStream.range(0, tableArgument.getFieldTypes().size()) - .boxed() - .collect(Collectors.toList())) + .requiredColumns(DATA_PARAMETER_NAME, Collections.singletonList(requiredIndex)) .build(); } @Override public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { - TableArgument tableArgument = (TableArgument) arguments.get(DATA_PARAMETER_NAME); - String expectedFieldName = - (String) ((ScalarArgument) arguments.get(TIMECOL_PARAMETER_NAME)).getValue(); - int requiredIndex = findTimeColumnIndex(tableArgument, expectedFieldName); - return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { return new HOPDataProcessor( (Long) ((ScalarArgument) arguments.get(START_PARAMETER_NAME)).getValue(), (Long) ((ScalarArgument) arguments.get(SLIDE_PARAMETER_NAME)).getValue(), - (Long) ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue(), - requiredIndex); + (Long) ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue()); } }; } @@ -135,19 +122,21 @@ public class HOPTableFunction implements TableFunction { private final long slide; private final long size; - private final int timeColumnIndex; private long curTime; + private long curIndex = 0; - public HOPDataProcessor(long startTime, long slide, long size, int timeColumnIndex) { + public HOPDataProcessor(long startTime, long slide, long size) { this.slide = slide; this.size = size; this.curTime = startTime; - this.timeColumnIndex = timeColumnIndex; } @Override - public void process(Record input, List<ColumnBuilder> columnBuilders) { - long timeValue = input.getLong(timeColumnIndex); + public void process( + Record input, + List<ColumnBuilder> properColumnBuilders, + ColumnBuilder passThroughIndexBuilder) { + long timeValue = input.getLong(0); if (curTime == Long.MIN_VALUE) { curTime = timeValue; } @@ -158,20 +147,12 @@ public class HOPTableFunction implements TableFunction { } long slideTime = curTime; while (slideTime <= timeValue && slideTime + size > timeValue) { - for (int i = 0; i < input.size(); i++) { - if (input.isNull(i)) { - columnBuilders.get(i + 2).appendNull(); - } else { - columnBuilders.get(i + 2).writeObject(input.getObject(i)); - } - } - columnBuilders.get(0).writeLong(slideTime); - columnBuilders.get(1).writeLong(slideTime + size); + properColumnBuilders.get(0).writeLong(slideTime); + properColumnBuilders.get(1).writeLong(slideTime + size); + passThroughIndexBuilder.writeLong(curIndex); slideTime += slide; } + curIndex++; } - - @Override - public void finish(List<ColumnBuilder> columnBuilders) {} } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/SplitFunction.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/SplitFunction.java index 76880270f37..d000940678a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/SplitFunction.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/SplitFunction.java @@ -44,7 +44,7 @@ public class SplitFunction implements TableFunction { private final String SPLIT_PARAMETER_NAME = "SPLIT"; @Override - public List<ParameterSpecification> getArgumentsSpecification() { + public List<ParameterSpecification> getArgumentsSpecifications() { return Arrays.asList( ScalarParameterSpecification.builder().name(INPUT_PARAMETER_NAME).type(Type.STRING).build(), ScalarParameterSpecification.builder() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/PartitionRecognizer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/PartitionRecognizer.java index 09be3664354..6068aeadc39 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/PartitionRecognizer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/PartitionRecognizer.java @@ -21,21 +21,14 @@ package org.apache.iotdb.db.queryengine.execution.operator.process.function; import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer; import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.PartitionState; -import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice; import org.apache.iotdb.udf.api.type.Type; -import org.apache.tsfile.block.column.Column; -import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; -import org.apache.tsfile.utils.Binary; -import org.apache.tsfile.utils.DateUtils; -import java.time.LocalDate; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; -import java.util.NoSuchElementException; import java.util.Objects; public class PartitionRecognizer { @@ -43,7 +36,7 @@ public class PartitionRecognizer { private final List<Integer> partitionChannels; private final List<Object> partitionValues; private final int[] requiredChannels; - private final List<Type> outputDataTypes; + private final List<Type> inputDataTypes; private TsBlock currentTsBlock = null; private boolean noMoreData = false; private int currentIndex = 0; @@ -52,14 +45,14 @@ public class PartitionRecognizer { public PartitionRecognizer( List<Integer> partitionChannels, List<Integer> requiredChannels, - List<TSDataType> outputDataTypes) { + List<TSDataType> inputDataTypes) { this.partitionChannels = partitionChannels; this.partitionValues = new ArrayList<>(partitionChannels.size()); for (int i = 0; i < partitionChannels.size(); i++) { partitionValues.add(null); } this.requiredChannels = requiredChannels.stream().mapToInt(i -> i).toArray(); - this.outputDataTypes = UDFDataTypeTransformer.transformToUDFDataTypeList(outputDataTypes); + this.inputDataTypes = UDFDataTypeTransformer.transformToUDFDataTypeList(inputDataTypes); } // TsBlock is sorted by partition columns already @@ -76,7 +69,7 @@ public class PartitionRecognizer { noMoreData = true; } - public PartitionState getState() { + public PartitionState nextState() { updateState(); return state; } @@ -109,9 +102,9 @@ public class PartitionRecognizer { return PartitionState.INIT_STATE; } int endPartitionIndex = findNextDifferentRowIndex(); - Iterator<Record> recordIterator = getRecordIterator(currentIndex, endPartitionIndex); + Slice slice = getSlice(currentIndex, endPartitionIndex); currentIndex = endPartitionIndex; - return PartitionState.newPartitionState(recordIterator); + return PartitionState.newPartitionState(slice); } private PartitionState handleNewPartitionState() { @@ -119,9 +112,9 @@ public class PartitionRecognizer { return PartitionState.NEED_MORE_DATA_STATE; } else { int endPartitionIndex = findNextDifferentRowIndex(); - Iterator<Record> recordIterator = getRecordIterator(currentIndex, endPartitionIndex); + Slice slice = getSlice(currentIndex, endPartitionIndex); currentIndex = endPartitionIndex; - return PartitionState.newPartitionState(recordIterator); + return PartitionState.newPartitionState(slice); } } @@ -133,15 +126,15 @@ public class PartitionRecognizer { } int endPartitionIndex = findNextDifferentRowIndex(); if (endPartitionIndex != 0) { - Iterator<Record> recordIterator = getRecordIterator(currentIndex, endPartitionIndex); + Slice slice = getSlice(currentIndex, endPartitionIndex); currentIndex = endPartitionIndex; - return PartitionState.iteratingState(recordIterator); + return PartitionState.iteratingState(slice); } else { currentIndex = endPartitionIndex; endPartitionIndex = findNextDifferentRowIndex(); - Iterator<Record> recordIterator = getRecordIterator(currentIndex, endPartitionIndex); + Slice slice = getSlice(currentIndex, endPartitionIndex); currentIndex = endPartitionIndex; - return PartitionState.newPartitionState(recordIterator); + return PartitionState.newPartitionState(slice); } } @@ -150,9 +143,9 @@ public class PartitionRecognizer { return PartitionState.NEED_MORE_DATA_STATE; } else { int endPartitionIndex = findNextDifferentRowIndex(); - Iterator<Record> recordIterator = getRecordIterator(currentIndex, endPartitionIndex); + Slice slice = getSlice(currentIndex, endPartitionIndex); currentIndex = endPartitionIndex; - return PartitionState.newPartitionState(recordIterator); + return PartitionState.newPartitionState(slice); } } @@ -179,85 +172,12 @@ public class PartitionRecognizer { return i; } - private Iterator<Record> getRecordIterator(int startPartitionIndex, int endPartitionIndex) { - return new Iterator<Record>() { - private int curIndex = startPartitionIndex; - private final int endIndex = endPartitionIndex; - private final Column[] columns = currentTsBlock.getColumns(requiredChannels); - - @Override - public boolean hasNext() { - return curIndex < endIndex; - } - - @Override - public Record next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - final int idx = curIndex++; - return new Record() { - @Override - public int getInt(int columnIndex) { - return columns[columnIndex].getInt(idx); - } - - @Override - public long getLong(int columnIndex) { - return columns[columnIndex].getLong(idx); - } - - @Override - public float getFloat(int columnIndex) { - return columns[columnIndex].getFloat(idx); - } - - @Override - public double getDouble(int columnIndex) { - return columns[columnIndex].getDouble(idx); - } - - @Override - public boolean getBoolean(int columnIndex) { - return columns[columnIndex].getBoolean(idx); - } - - @Override - public Binary getBinary(int columnIndex) { - return columns[columnIndex].getBinary(idx); - } - - @Override - public String getString(int columnIndex) { - return columns[columnIndex].getBinary(idx).getStringValue(TSFileConfig.STRING_CHARSET); - } - - @Override - public LocalDate getLocalDate(int columnIndex) { - return DateUtils.parseIntToLocalDate(columns[columnIndex].getInt(idx)); - } - - @Override - public Object getObject(int columnIndex) { - return columns[columnIndex].getObject(idx); - } - - @Override - public Type getDataType(int columnIndex) { - return outputDataTypes.get(columnIndex); - } - - @Override - public boolean isNull(int columnIndex) { - return columns[columnIndex].isNull(idx); - } - - @Override - public int size() { - return columns.length; - } - }; - } - }; + private Slice getSlice(int startPartitionIndex, int endPartitionIndex) { + return new Slice( + startPartitionIndex, + endPartitionIndex, + currentTsBlock.getValueColumns(), + requiredChannels, + inputDataTypes); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java index 0ad55973e38..58ba5bdac45 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java @@ -23,16 +23,20 @@ import org.apache.iotdb.db.queryengine.execution.operator.Operator; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.PartitionState; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.SliceCache; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode; import org.apache.iotdb.udf.api.relational.access.Record; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.block.column.Column; import org.apache.tsfile.block.column.ColumnBuilder; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.LongColumnBuilder; import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; import java.util.Arrays; @@ -49,16 +53,22 @@ public class TableFunctionOperator implements ProcessOperator { private final TableFunctionProcessorProvider processorProvider; private final PartitionRecognizer partitionRecognizer; private final TsBlockBuilder blockBuilder; + private final int properChannelCount; + private final boolean needPassThrough; private TableFunctionDataProcessor processor; private PartitionState partitionState; private ListenableFuture<?> isBlocked; private boolean finished = false; + private ColumnBuilder passThroughIndexBuilder; + + private SliceCache sliceCache; public TableFunctionOperator( OperatorContext operatorContext, TableFunctionProcessorProvider processorProvider, Operator inputOperator, + List<TSDataType> inputDataTypes, List<TSDataType> outputDataTypes, int properChannelCount, List<Integer> requiredChannels, @@ -66,11 +76,14 @@ public class TableFunctionOperator implements ProcessOperator { List<Integer> partitionChannels) { this.operatorContext = operatorContext; this.inputOperator = inputOperator; + this.properChannelCount = properChannelCount; this.processorProvider = processorProvider; this.partitionRecognizer = - new PartitionRecognizer(partitionChannels, requiredChannels, outputDataTypes); + new PartitionRecognizer(partitionChannels, requiredChannels, inputDataTypes); + this.needPassThrough = properChannelCount != outputDataTypes.size(); this.partitionState = null; this.blockBuilder = new TsBlockBuilder(outputDataTypes); + this.sliceCache = new SliceCache(); } @Override @@ -107,42 +120,72 @@ public class TableFunctionOperator implements ProcessOperator { @Override public TsBlock next() throws Exception { PartitionState.StateType stateType = partitionState.getStateType(); - Iterator<Record> recordIterator = partitionState.getRecordIterator(); + Slice slice = partitionState.getSlice(); partitionState = null; if (stateType == PartitionState.StateType.INIT || stateType == PartitionState.StateType.NEED_MORE_DATA) { isBlocked = null; return null; } else { - List<ColumnBuilder> columnBuilders = getOutputColumnBuilders(); + List<ColumnBuilder> properColumnBuilders = getProperColumnBuilders(); + ColumnBuilder passThroughIndexBuilder = getPassThroughIndexBuilder(); if (stateType == PartitionState.StateType.FINISHED) { if (processor != null) { - processor.finish(columnBuilders); + processor.finish(properColumnBuilders, passThroughIndexBuilder); } finished = true; - return buildTsBlock(columnBuilders); + return buildTsBlock(properColumnBuilders, passThroughIndexBuilder); } if (stateType == PartitionState.StateType.NEW_PARTITION) { if (processor != null) { - processor.finish(columnBuilders); + processor.finish(properColumnBuilders, passThroughIndexBuilder); } + sliceCache.clear(); processor = processorProvider.getDataProcessor(); } + sliceCache.addSlice(slice); + Iterator<Record> recordIterator = slice.getRequiredRecordIterator(); while (recordIterator.hasNext()) { - processor.process(recordIterator.next(), columnBuilders); + processor.process(recordIterator.next(), properColumnBuilders, passThroughIndexBuilder); } - return buildTsBlock(columnBuilders); + return buildTsBlock(properColumnBuilders, passThroughIndexBuilder); } } - private List<ColumnBuilder> getOutputColumnBuilders() { + private List<ColumnBuilder> getProperColumnBuilders() { blockBuilder.reset(); - return Arrays.asList(blockBuilder.getValueColumnBuilders()); + return Arrays.asList(blockBuilder.getValueColumnBuilders()).subList(0, properChannelCount); + } + + private ColumnBuilder getPassThroughIndexBuilder() { + if (needPassThrough) { + passThroughIndexBuilder = new LongColumnBuilder(null, 1); + } + return passThroughIndexBuilder; } - private TsBlock buildTsBlock(List<ColumnBuilder> columnBuilders) { - int positionCount = columnBuilders.get(0).getPositionCount(); + private TsBlock buildTsBlock( + List<ColumnBuilder> properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { + List<ColumnBuilder> passThroughColumnBuilders = + Arrays.asList(blockBuilder.getValueColumnBuilders()) + .subList(properChannelCount, blockBuilder.getValueColumnBuilders().length); + int positionCount = properColumnBuilders.get(0).getPositionCount(); blockBuilder.declarePositions(positionCount); + if (needPassThrough) { + // handle pass through column only if needed + Column passThroughIndex = passThroughIndexBuilder.build(); + for (int i = 0; i < passThroughIndex.getPositionCount(); i++) { + long index = passThroughIndex.getLong(i); + Record row = sliceCache.getOriginalRecord((int) index); + for (int j = 0; j < row.size(); j++) { + if (row.isNull(j)) { + passThroughColumnBuilders.get(j).appendNull(); + } else { + passThroughColumnBuilders.get(j).writeObject(row.getObject(j)); + } + } + } + } return blockBuilder.build(new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, positionCount)); } @@ -150,13 +193,15 @@ public class TableFunctionOperator implements ProcessOperator { public boolean hasNext() throws Exception { if (partitionState == null) { isBlocked().get(); // wait for the next TsBlock - partitionState = partitionRecognizer.getState(); + partitionState = partitionRecognizer.nextState(); } return !finished; } @Override - public void close() throws Exception {} + public void close() throws Exception { + sliceCache.close(); + } @Override public boolean isFinished() throws Exception { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/PartitionState.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/PartitionState.java index 979fec9b611..1c3c6e8686a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/PartitionState.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/PartitionState.java @@ -19,10 +19,6 @@ package org.apache.iotdb.db.queryengine.execution.operator.process.function.partition; -import org.apache.iotdb.udf.api.relational.access.Record; - -import java.util.Iterator; - public class PartitionState { public static final PartitionState NEED_MORE_DATA_STATE = @@ -30,12 +26,12 @@ public class PartitionState { public static final PartitionState FINISHED_STATE = new PartitionState(StateType.FINISHED, null); public static final PartitionState INIT_STATE = new PartitionState(StateType.INIT, null); - public static PartitionState newPartitionState(Iterator<Record> recordIterator) { - return new PartitionState(StateType.NEW_PARTITION, recordIterator); + public static PartitionState newPartitionState(Slice slice) { + return new PartitionState(StateType.NEW_PARTITION, slice); } - public static PartitionState iteratingState(Iterator<Record> recordIterator) { - return new PartitionState(StateType.ITERATING, recordIterator); + public static PartitionState iteratingState(Slice slice) { + return new PartitionState(StateType.ITERATING, slice); } public enum StateType { @@ -48,11 +44,11 @@ public class PartitionState { private final StateType stateType; // Nullable - private final Iterator<Record> recordIterator; + private final Slice slice; - protected PartitionState(StateType stateType, Iterator<Record> recordIterator) { + protected PartitionState(StateType stateType, Slice slice) { this.stateType = stateType; - this.recordIterator = recordIterator; + this.slice = slice; } public StateType getStateType() { @@ -60,7 +56,7 @@ public class PartitionState { } // Nullable - public Iterator<Record> getRecordIterator() { - return recordIterator; + public Slice getSlice() { + return slice; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java new file mode 100644 index 00000000000..19b7d47f4e9 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.function.partition; + +import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.type.Type; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.DateUtils; + +import java.time.LocalDate; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** Parts of partition. */ +public class Slice { + + private final int startIndex; + private final int endIndex; + private final Column[] requiredColumns; + private final Column[] originalColumns; + private final List<Type> dataTypes; + + public Slice( + int startIndex, + int endIndex, + Column[] columns, + int[] requiredChannels, + List<Type> dataTypes) { + this.startIndex = startIndex; + this.endIndex = endIndex; + this.originalColumns = columns; + this.requiredColumns = new Column[requiredChannels.length]; + for (int i = 0; i < requiredChannels.length; i++) { + requiredColumns[i] = columns[requiredChannels[i]]; + } + this.dataTypes = dataTypes; + } + + public int getSize() { + return endIndex - startIndex; + } + + public Record getOriginalRecord(int offset) { + return getRecord(startIndex + offset, originalColumns); + } + + public Iterator<Record> getRequiredRecordIterator() { + return new Iterator<Record>() { + private int curIndex = startIndex; + + @Override + public boolean hasNext() { + return curIndex < endIndex; + } + + @Override + public Record next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + final int idx = curIndex++; + return getRecord(idx, requiredColumns); + } + }; + } + + private Record getRecord(int offset, Column[] originalColumns) { + return new Record() { + @Override + public int getInt(int columnIndex) { + return originalColumns[columnIndex].getInt(offset); + } + + @Override + public long getLong(int columnIndex) { + return originalColumns[columnIndex].getLong(offset); + } + + @Override + public float getFloat(int columnIndex) { + return originalColumns[columnIndex].getFloat(offset); + } + + @Override + public double getDouble(int columnIndex) { + return originalColumns[columnIndex].getDouble(offset); + } + + @Override + public boolean getBoolean(int columnIndex) { + return originalColumns[columnIndex].getBoolean(offset); + } + + @Override + public Binary getBinary(int columnIndex) { + return originalColumns[columnIndex].getBinary(offset); + } + + @Override + public String getString(int columnIndex) { + return originalColumns[columnIndex] + .getBinary(offset) + .getStringValue(TSFileConfig.STRING_CHARSET); + } + + @Override + public LocalDate getLocalDate(int columnIndex) { + return DateUtils.parseIntToLocalDate(originalColumns[columnIndex].getInt(offset)); + } + + @Override + public Object getObject(int columnIndex) { + return originalColumns[columnIndex].getObject(offset); + } + + @Override + public Type getDataType(int columnIndex) { + return dataTypes.get(columnIndex); + } + + @Override + public boolean isNull(int columnIndex) { + return originalColumns[columnIndex].isNull(offset); + } + + @Override + public int size() { + return originalColumns.length; + } + }; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/SliceCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/SliceCache.java new file mode 100644 index 00000000000..993289fd1a9 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/SliceCache.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.function.partition; + +import org.apache.iotdb.udf.api.relational.access.Record; + +import java.util.ArrayList; +import java.util.List; + +/** Used to manage the slices of the partition. It is all in memory now. */ +public class SliceCache { + + private final List<Slice> slices = new ArrayList<>(); + + public Record getOriginalRecord(long index) { + long previousSize = 0; + for (Slice slice : slices) { + long currentSize = slice.getSize(); + if (index < previousSize + currentSize) { + return slice.getOriginalRecord((int) (index - previousSize)); + } + previousSize += currentSize; + } + throw new IndexOutOfBoundsException("Index out of bound"); + } + + public void addSlice(Slice slice) { + slices.add(slice); + } + + public void clear() { + slices.clear(); + } + + public void close() { + // do nothing + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index f28c338b6ac..6abe443616c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -2353,6 +2353,12 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution node.getPlanNodeId(), TableFunctionOperator.class.getSimpleName()); + List<TSDataType> inputDataTypes = + node.getChild().getOutputSymbols().stream() + .map(context.getTypeProvider()::getTableModelType) + .map(InternalTypeManager::getTSDataType) + .collect(Collectors.toList()); + List<TSDataType> outputDataTypes = node.getOutputSymbols().stream() .map(context.getTypeProvider()::getTableModelType) @@ -2380,6 +2386,7 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution operatorContext, processorProvider, operator, + inputDataTypes, outputDataTypes, properChannelCount, requiredChannels, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index 441b31b2c42..3fab6cfd6ea 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -78,7 +78,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNod import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode; import org.apache.iotdb.udf.api.relational.table.argument.Argument; -import org.apache.iotdb.udf.api.relational.table.argument.DescriptorArgument; import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument; import org.apache.iotdb.udf.api.relational.table.argument.TableArgument; @@ -96,7 +95,6 @@ import java.util.Optional; import static com.google.common.base.Preconditions.checkArgument; import static java.lang.String.format; -import static java.util.stream.Collectors.joining; import static org.apache.iotdb.db.utils.DateTimeUtils.TIMESTAMP_PRECISION; public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter.GraphContext> { @@ -989,11 +987,10 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter private String formatArgument(String argumentName, Argument argument) { if (argument instanceof ScalarArgument) { return formatScalarArgument(argumentName, (ScalarArgument) argument); - } - if (argument instanceof DescriptorArgument) { - return formatDescriptorArgument(argumentName, (DescriptorArgument) argument); - } else { + } else if (argument instanceof TableArgument) { return formatTableArgument(argumentName, (TableArgument) argument); + } else { + return argumentName + " => " + argument; } } @@ -1003,19 +1000,6 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter argumentName, argument.getType(), argument.getValue()); } - private String formatDescriptorArgument(String argumentName, DescriptorArgument argument) { - String descriptor; - if (argument.getDescriptor().isPresent()) { - descriptor = - argument.getDescriptor().get().getFields().stream() - .map(field -> field.getName() + field.getType().map(type -> " " + type).orElse("")) - .collect(joining(", ", "(", ")")); - } else { - descriptor = "NULL"; - } - return format("%s => DescriptorArgument{%s}", argumentName, descriptor); - } - private String formatTableArgument(String argumentName, TableArgument argument) { return format("%s => TableArgument", argumentName); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java index d79fcca4476..81ec55af5b0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java @@ -140,7 +140,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SubqueryExpressio import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Table; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TableFunctionArgument; -import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TableFunctionDescriptorArgument; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TableFunctionInvocation; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TableFunctionTableArgument; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TableSubquery; @@ -154,7 +153,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WithQuery; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement; import org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager; import org.apache.iotdb.db.queryengine.plan.relational.type.TypeManager; -import org.apache.iotdb.db.queryengine.plan.relational.type.TypeNotFoundException; import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; @@ -167,11 +165,8 @@ import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; -import org.apache.iotdb.udf.api.relational.table.argument.Descriptor; -import org.apache.iotdb.udf.api.relational.table.argument.DescriptorArgument; import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument; import org.apache.iotdb.udf.api.relational.table.argument.TableArgument; -import org.apache.iotdb.udf.api.relational.table.specification.DescriptorParameterSpecification; import org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification; import org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification; import org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification; @@ -238,10 +233,8 @@ import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join.Type. import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join.Type.LEFT; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join.Type.RIGHT; import static org.apache.iotdb.db.queryengine.plan.relational.sql.util.AstUtil.preOrder; -import static org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignatureTranslator.toTypeSignature; import static org.apache.iotdb.db.queryengine.plan.relational.utils.NodeUtils.getSortItemsFromOrderBy; import static org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS; -import static org.apache.iotdb.udf.api.relational.table.argument.DescriptorArgument.NULL_DESCRIPTOR; import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN; public class StatementAnalyzer { @@ -3139,12 +3132,16 @@ public class StatementAnalyzer { ArgumentsAnalysis argumentsAnalysis = analyzeArguments( - function.getArgumentsSpecification(), node.getArguments(), scope, errorLocation); + function.getArgumentsSpecifications(), node.getArguments(), scope, errorLocation); TableFunctionAnalysis functionAnalysis = function.analyze(argumentsAnalysis.getPassedArguments()); - if (node.getCopartitioning().size() != 0) { - // TODO(UDF): support copartition in future + // At most one table argument can be passed to a table function now + if (argumentsAnalysis.getTableArgumentAnalyses().size() > 1) { + throw new SemanticException("At most one table argument can be passed to a table function"); + } + // Copartition is not supported now + if (!node.getCopartitioning().isEmpty()) { throw new SemanticException("Copartitioning is not supported now."); } @@ -3163,15 +3160,19 @@ public class StatementAnalyzer { "Table function %s specifies required columns from table argument %s which cannot be found", node.getName(), name)); } - // TODO(UDF): 定义的时候需要确保 columns 不是empty且不为负数,不在这里二次确认了 - // if (columns.isEmpty()) { - // throw new TrinoException( - // FUNCTION_IMPLEMENTATION_ERROR, - // format( - // "Table function %s specifies empty list of required columns from - // table argument %s", - // node.getName(), name)); - // } + // make sure the required columns are not empty and positive + if (columns.isEmpty()) { + throw new SemanticException( + String.format( + "Table function %s specifies empty list of required columns from table argument %s", + node.getName(), name)); + } + if (columns.stream().anyMatch(column -> column < 0)) { + throw new SemanticException( + String.format( + "Table function %s specifies negative index of required column from table argument %s", + node.getName(), name)); + } // the scope is recorded, because table arguments are already analyzed Scope inputScope = analysis.getScope(tableArgumentsByName.get(name).getRelation()); columns.stream() @@ -3189,8 +3190,7 @@ public class StatementAnalyzer { .map(inputScope.getRelationType()::getFieldByIndex) .forEach(this::recordColumnAccess); }); - // TODO(UDF):我觉得requiredInputs应该是一定等于tableArgumentNameSet的。可以考虑让用户在定义的时候,一定要为每个table - // argument指定required columns?这个应该在注册的时候做检查。 + // check that all required inputs are specified Set<String> requiredInputs = ImmutableSet.copyOf(requiredColumns.keySet()); tableArgumentNameSet.stream() .filter(input -> !requiredInputs.contains(input)) @@ -3224,7 +3224,7 @@ public class StatementAnalyzer { // next, columns derived from table arguments, in order of argument declarations List<String> tableArgumentNames = - function.getArgumentsSpecification().stream() + function.getArgumentsSpecifications().stream() .filter(TableParameterSpecification.class::isInstance) .map(ParameterSpecification::getName) .collect(toImmutableList()); @@ -3288,7 +3288,6 @@ public class StatementAnalyzer { ImmutableMap.Builder<String, Argument> passedArguments = ImmutableMap.builder(); ImmutableList.Builder<TableArgumentAnalysis> tableArgumentAnalyses = ImmutableList.builder(); // TODO(UDF): args passed positionally - can one only pass some prefix of args? - // TODO(UDF): check at most one table argument passed if (argumentsPassedByName) { Map<String, ParameterSpecification> argumentSpecificationsByName = new HashMap<>(); for (ParameterSpecification parameterSpecification : parameterSpecifications) { @@ -3344,7 +3343,6 @@ public class StatementAnalyzer { analyzeDefault(parameterSpecification, errorLocation)); } } - return new ArgumentsAnalysis(passedArguments.buildOrThrow(), tableArgumentAnalyses.build()); } @@ -3355,8 +3353,6 @@ public class StatementAnalyzer { String actualType; if (argument.getValue() instanceof TableFunctionTableArgument) { actualType = "table"; - } else if (argument.getValue() instanceof TableFunctionDescriptorArgument) { - actualType = "descriptor"; } else if (argument.getValue() instanceof Expression) { actualType = "expression"; } else { @@ -3377,14 +3373,6 @@ public class StatementAnalyzer { (TableFunctionTableArgument) argument.getValue(), (TableParameterSpecification) parameterSpecification, scope); - } else if (parameterSpecification instanceof DescriptorParameterSpecification) { - if (!(argument.getValue() instanceof TableFunctionDescriptorArgument)) { - throw new SemanticException( - String.format( - "Invalid argument %s. Expected descriptor argument, got %s", - parameterSpecification.getName(), actualType)); - } - return analyzeDescriptorArgument((TableFunctionDescriptorArgument) argument.getValue()); } else if (parameterSpecification instanceof ScalarParameterSpecification) { if (!(argument.getValue() instanceof Expression)) { throw new SemanticException( @@ -3392,15 +3380,6 @@ public class StatementAnalyzer { "Invalid argument %s. Expected scalar argument, got %s", parameterSpecification.getName(), actualType)); } - // TODO(UDF): - // 如果functionName是descriptor的话,会被解析成FunctionCall还是TableFunctionDescriptorArgument? - // // 'descriptor' as a function name is not allowed in this context - // if (expression instanceof FunctionCall && ((FunctionCall) - // expression).getName().hasSuffix(QualifiedName.of("descriptor"))) { // function name is - // always compared case-insensitive - // throw semanticException(INVALID_FUNCTION_ARGUMENT, argument, "'descriptor' - // function is not allowed as a table function argument"); - // } return analyzeScalarArgument( (Expression) argument.getValue(), (ScalarParameterSpecification) parameterSpecification); @@ -3543,40 +3522,6 @@ public class StatementAnalyzer { Optional.of(analysisBuilder.build())); } - private ArgumentAnalysis analyzeDescriptorArgument(TableFunctionDescriptorArgument argument) { - return new ArgumentAnalysis( - argument - .getDescriptor() - .map( - descriptor -> - new DescriptorArgument( - Optional.of( - new Descriptor( - descriptor.getFields().stream() - .map( - field -> - new Descriptor.Field( - field.getName().getCanonicalValue(), - field - .getType() - .map( - type -> { - try { - return UDFDataTypeTransformer - .transformReadTypeToUDFDataType( - typeManager.getType( - toTypeSignature(type))); - } catch (TypeNotFoundException e) { - throw new SemanticException( - String.format( - "Unknown type: %s", type)); - } - }))) - .collect(toImmutableList()))))) - .orElse(NULL_DESCRIPTOR), - Optional.empty()); - } - private ArgumentAnalysis analyzeScalarArgument( Expression expression, ScalarParameterSpecification argumentSpecification) { // currently, only constant arguments are supported @@ -3609,14 +3554,7 @@ public class StatementAnalyzer { !(parameterSpecification instanceof TableParameterSpecification), "Table argument specification cannot have a default value."); - if (parameterSpecification instanceof DescriptorParameterSpecification) { - if (parameterSpecification.getDefaultValue().isPresent()) { - return new DescriptorArgument( - Optional.of((Descriptor) parameterSpecification.getDefaultValue().get())); - } else { - return NULL_DESCRIPTOR; - } - } else if (parameterSpecification instanceof ScalarParameterSpecification) { + if (parameterSpecification instanceof ScalarParameterSpecification) { checkArgument( parameterSpecification.getDefaultValue().isPresent(), String.format( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java index 344dad521a7..42ca79d3a1c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java @@ -633,14 +633,6 @@ public abstract class AstVisitor<R, C> { return visitNode(tableFunctionTableArgument, context); } - public R visitDescriptorField(DescriptorField descriptorField, C context) { - return visitNode(descriptorField, context); - } - - public R visitDescriptor(Descriptor descriptor, C context) { - return visitNode(descriptor, context); - } - public R visitEmptyTableTreatment(EmptyTableTreatment emptyTableTreatment, C context) { return visitNode(emptyTableTreatment, context); } @@ -649,11 +641,6 @@ public abstract class AstVisitor<R, C> { return visitNode(tableFunctionArgument, context); } - public R visitDescriptorArgument( - TableFunctionDescriptorArgument tableFunctionDescriptorArgument, C context) { - return visitNode(tableFunctionDescriptorArgument, context); - } - public R visitTableFunctionInvocation( TableFunctionInvocation tableFunctionInvocation, C context) { return visitNode(tableFunctionInvocation, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Descriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Descriptor.java deleted file mode 100644 index d86c86b1b50..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Descriptor.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; - -import java.util.List; -import java.util.Objects; -import java.util.stream.Collectors; - -import static com.google.common.base.Preconditions.checkArgument; -import static java.util.Objects.requireNonNull; - -public class Descriptor extends Node { - private final List<DescriptorField> fields; - - public Descriptor(NodeLocation location, List<DescriptorField> fields) { - super(location); - requireNonNull(fields, "fields is null"); - checkArgument(!fields.isEmpty(), "fields list is empty"); - this.fields = fields; - } - - public List<DescriptorField> getFields() { - return fields; - } - - @Override - public <R, C> R accept(AstVisitor<R, C> visitor, C context) { - return visitor.visitDescriptor(this, context); - } - - @Override - public List<? extends Node> getChildren() { - return fields; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - return Objects.equals(fields, ((Descriptor) o).fields); - } - - @Override - public int hashCode() { - return Objects.hash(fields); - } - - @Override - public String toString() { - return fields.stream() - .map(DescriptorField::toString) - .collect(Collectors.joining(", ", "DESCRIPTOR(", ")")); - } - - @Override - public boolean shallowEquals(Node o) { - return sameClass(this, o); - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DescriptorField.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DescriptorField.java deleted file mode 100644 index 09c8f6acdb2..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DescriptorField.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; - -import com.google.common.collect.ImmutableList; - -import java.util.List; -import java.util.Objects; -import java.util.Optional; - -import static java.util.Objects.requireNonNull; - -public class DescriptorField extends Node { - private final Identifier name; - private final Optional<DataType> type; - - public DescriptorField(NodeLocation location, Identifier name, Optional<DataType> type) { - super(location); - this.name = requireNonNull(name, "name is null"); - this.type = requireNonNull(type, "type is null"); - } - - public Identifier getName() { - return name; - } - - public Optional<DataType> getType() { - return type; - } - - @Override - public <R, C> R accept(AstVisitor<R, C> visitor, C context) { - return visitor.visitDescriptorField(this, context); - } - - @Override - public List<? extends Node> getChildren() { - return type.map(ImmutableList::of).orElse(ImmutableList.of()); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - DescriptorField field = (DescriptorField) o; - return Objects.equals(name, field.name) && Objects.equals(type, field.type); - } - - @Override - public int hashCode() { - return Objects.hash(name, type); - } - - @Override - public String toString() { - return type.map(dataType -> name + " " + dataType).orElseGet(name::toString); - } - - @Override - public boolean shallowEquals(Node o) { - if (!sameClass(this, o)) { - return false; - } - - return Objects.equals(name, ((DescriptorField) o).name); - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/TableFunctionArgument.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/TableFunctionArgument.java index 01642378f60..5fac31abc43 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/TableFunctionArgument.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/TableFunctionArgument.java @@ -36,10 +36,7 @@ public class TableFunctionArgument extends Node { super(location); this.name = requireNonNull(name, "name is null"); requireNonNull(value, "value is null"); - checkArgument( - value instanceof TableFunctionTableArgument - || value instanceof TableFunctionDescriptorArgument - || value instanceof Expression); + checkArgument(value instanceof TableFunctionTableArgument || value instanceof Expression); this.value = value; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/TableFunctionDescriptorArgument.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/TableFunctionDescriptorArgument.java deleted file mode 100644 index 65ac6d32391..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/TableFunctionDescriptorArgument.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; - -import com.google.common.collect.ImmutableList; - -import java.util.List; -import java.util.Objects; -import java.util.Optional; - -import static java.util.Objects.requireNonNull; - -public class TableFunctionDescriptorArgument extends Node { - private final Optional<Descriptor> descriptor; - - public static TableFunctionDescriptorArgument descriptorArgument( - NodeLocation location, Descriptor descriptor) { - requireNonNull(descriptor, "descriptor is null"); - return new TableFunctionDescriptorArgument(location, Optional.of(descriptor)); - } - - public static TableFunctionDescriptorArgument nullDescriptorArgument(NodeLocation location) { - return new TableFunctionDescriptorArgument(location, Optional.empty()); - } - - private TableFunctionDescriptorArgument(NodeLocation location, Optional<Descriptor> descriptor) { - super(location); - this.descriptor = descriptor; - } - - public Optional<Descriptor> getDescriptor() { - return descriptor; - } - - @Override - public <R, C> R accept(AstVisitor<R, C> visitor, C context) { - return visitor.visitDescriptorArgument(this, context); - } - - @Override - public List<? extends Node> getChildren() { - return descriptor.map(ImmutableList::of).orElse(ImmutableList.of()); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - return Objects.equals(descriptor, ((TableFunctionDescriptorArgument) o).descriptor); - } - - @Override - public int hashCode() { - return Objects.hash(descriptor); - } - - @Override - public String toString() { - return descriptor.map(Descriptor::toString).orElse("CAST (NULL AS DESCRIPTOR)"); - } - - @Override - public boolean shallowEquals(Node o) { - return sameClass(this, o); - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index 26251e8a117..83351aa3562 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -67,8 +67,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DeleteDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DereferenceExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DescribeTable; -import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Descriptor; -import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DescriptorField; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DoubleLiteral; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropColumn; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropDB; @@ -246,8 +244,6 @@ import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupingSe import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupingSets.Type.EXPLICIT; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupingSets.Type.ROLLUP; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName.mapIdentifier; -import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TableFunctionDescriptorArgument.descriptorArgument; -import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TableFunctionDescriptorArgument.nullDescriptorArgument; import static org.apache.iotdb.db.utils.TimestampPrecisionUtils.currPrecision; import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_AGGREGATION; import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_BY_AGGREGATION; @@ -1864,8 +1860,6 @@ public class AstBuilder extends RelationalSqlBaseVisitor<Node> { Node value; if (context.tableArgument() != null) { value = visit(context.tableArgument()); - } else if (context.descriptorArgument() != null) { - value = visit(context.descriptorArgument()); } else { value = visit(context.scalarArgument()); } @@ -1946,24 +1940,6 @@ public class AstBuilder extends RelationalSqlBaseVisitor<Node> { return relation; } - @Override - public Node visitDescriptorArgument(RelationalSqlParser.DescriptorArgumentContext context) { - if (context.NULL() != null) { - return nullDescriptorArgument(getLocation(context)); - } - List<DescriptorField> fields = visit(context.descriptorField(), DescriptorField.class); - return descriptorArgument( - getLocation(context), new Descriptor(getLocation(context.DESCRIPTOR()), fields)); - } - - @Override - public Node visitDescriptorField(RelationalSqlParser.DescriptorFieldContext context) { - return new DescriptorField( - getLocation(context), - (Identifier) visit(context.identifier()), - visitIfPresent(context.type(), DataType.class)); - } - @Override public Node visitScalarArgument(RelationalSqlParser.ScalarArgumentContext ctx) { if (ctx.expression() != null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/SqlFormatter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/SqlFormatter.java index 5f85e585db1..eed5429e773 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/SqlFormatter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/SqlFormatter.java @@ -88,7 +88,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.StartPipe; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.StopPipe; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Table; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TableFunctionArgument; -import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TableFunctionDescriptorArgument; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TableFunctionInvocation; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TableFunctionTableArgument; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TableSubquery; @@ -1315,27 +1314,5 @@ public final class SqlFormatter { return null; } - - @Override - public Void visitDescriptorArgument(TableFunctionDescriptorArgument node, Integer indent) { - if (node.getDescriptor().isPresent()) { - builder.append( - node.getDescriptor().get().getFields().stream() - .map( - field -> { - String formattedField = formatName(field.getName()); - if (field.getType().isPresent()) { - formattedField = - formattedField + " " + formatExpression(field.getType().get()); - } - return formattedField; - }) - .collect(joining(", ", "DESCRIPTOR(", ")"))); - } else { - builder.append("CAST (NULL AS DESCRIPTOR)"); - } - - return null; - } } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/tvf/TableFunctionOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/tvf/TableFunctionOperatorTest.java index 70fe70eaf85..90dfcb673fb 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/tvf/TableFunctionOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/tvf/TableFunctionOperatorTest.java @@ -30,6 +30,7 @@ import org.apache.iotdb.db.queryengine.execution.operator.Operator; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.execution.operator.process.function.PartitionRecognizer; import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.PartitionState; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice; import org.apache.iotdb.udf.api.relational.access.Record; import org.apache.tsfile.common.conf.TSFileConfig; @@ -187,7 +188,7 @@ public class TableFunctionOperatorTest { } @Test - public void PartitionRecognizerTest() { + public void testPartitionRecognizer() { QueryId queryId = new QueryId("stub_query"); FragmentInstanceId instanceId = new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); @@ -202,91 +203,97 @@ public class TableFunctionOperatorTest { Collections.singletonList(1), Arrays.asList(0, 1, 3), Arrays.asList(TSDataType.TIMESTAMP, TSDataType.STRING, TSDataType.INT32)); - PartitionState state = partitionRecognizer.getState(); + PartitionState state = partitionRecognizer.nextState(); Assert.assertEquals(PartitionState.INIT_STATE, state); - Assert.assertEquals(PartitionState.INIT_STATE, partitionRecognizer.getState()); + Assert.assertEquals(PartitionState.INIT_STATE, partitionRecognizer.nextState()); // 1. add first TsBlock, expected NEW_PARTITION and NEED_MORE_DATA Assert.assertTrue(childOperator.hasNext()); partitionRecognizer.addTsBlock(childOperator.next()); - state = partitionRecognizer.getState(); + state = partitionRecognizer.nextState(); Assert.assertEquals(PartitionState.StateType.NEW_PARTITION, state.getStateType()); - checkIteratorSimply(state.getRecordIterator(), Collections.emptyList()); - state = partitionRecognizer.getState(); + checkIteratorSimply(state.getSlice(), Collections.emptyList()); + state = partitionRecognizer.nextState(); Assert.assertEquals(PartitionState.StateType.NEW_PARTITION, state.getStateType()); checkIteratorSimply( - state.getRecordIterator(), + state.getSlice(), Arrays.asList( Arrays.asList(1717171200000L, D1_BINARY, 60), Arrays.asList(1719763200000L, D1_BINARY, 60))); - state = partitionRecognizer.getState(); + state = partitionRecognizer.nextState(); Assert.assertEquals(PartitionState.StateType.NEED_MORE_DATA, state.getStateType()); // 2. add second TsBlock, expected ITERATING, NEW_PARTITION and NEED_MORE_DATA Assert.assertTrue(childOperator.hasNext()); partitionRecognizer.addTsBlock(childOperator.next()); - state = partitionRecognizer.getState(); + state = partitionRecognizer.nextState(); Assert.assertEquals(PartitionState.StateType.ITERATING, state.getStateType()); checkIteratorSimply( - state.getRecordIterator(), + state.getSlice(), Collections.singletonList(Arrays.asList(1722441600000L, D1_BINARY, 61))); - state = partitionRecognizer.getState(); + state = partitionRecognizer.nextState(); Assert.assertEquals(PartitionState.StateType.NEW_PARTITION, state.getStateType()); checkIteratorSimply( - state.getRecordIterator(), + state.getSlice(), Arrays.asList( Arrays.asList(1725120000000L, D2_BINARY, 62), Arrays.asList(1725130000000L, D2_BINARY, 62), Arrays.asList(1725140000000L, D2_BINARY, 63))); - state = partitionRecognizer.getState(); + state = partitionRecognizer.nextState(); Assert.assertEquals(PartitionState.StateType.NEED_MORE_DATA, state.getStateType()); // 3. add third and fourth TsBlock, expected NEED_MORE_DATA Assert.assertTrue(childOperator.hasNext()); partitionRecognizer.addTsBlock(childOperator.next()); - state = partitionRecognizer.getState(); + state = partitionRecognizer.nextState(); Assert.assertEquals(PartitionState.StateType.NEED_MORE_DATA, state.getStateType()); Assert.assertTrue(childOperator.hasNext()); partitionRecognizer.addTsBlock(childOperator.next()); - state = partitionRecognizer.getState(); + state = partitionRecognizer.nextState(); Assert.assertEquals(PartitionState.StateType.NEED_MORE_DATA, state.getStateType()); // 4. add fifth TsBlock, expected NEW_PARTITION, NEW_PARTITION and NEED_MORE_DATA Assert.assertTrue(childOperator.hasNext()); partitionRecognizer.addTsBlock(childOperator.next()); - state = partitionRecognizer.getState(); + state = partitionRecognizer.nextState(); Assert.assertEquals(PartitionState.StateType.NEW_PARTITION, state.getStateType()); checkIteratorSimply( - state.getRecordIterator(), + state.getSlice(), Arrays.asList( Arrays.asList(1722441600000L, D3_BINARY, 63), Arrays.asList(1722441800000L, D3_BINARY, 64))); - state = partitionRecognizer.getState(); + state = partitionRecognizer.nextState(); Assert.assertEquals(PartitionState.StateType.NEW_PARTITION, state.getStateType()); checkIteratorSimply( - state.getRecordIterator(), + state.getSlice(), Collections.singletonList(Arrays.asList(1722551600000L, D5_BINARY, 64))); - state = partitionRecognizer.getState(); + state = partitionRecognizer.nextState(); Assert.assertEquals(PartitionState.StateType.NEED_MORE_DATA, state.getStateType()); // 5. add sixth TsBlock, expected ITERATING and NEED_MORE_DATA Assert.assertTrue(childOperator.hasNext()); partitionRecognizer.addTsBlock(childOperator.next()); - state = partitionRecognizer.getState(); + state = partitionRecognizer.nextState(); Assert.assertEquals(PartitionState.StateType.ITERATING, state.getStateType()); checkIteratorSimply( - state.getRecordIterator(), + state.getSlice(), Arrays.asList( Arrays.asList(1722552800000L, D5_BINARY, 65), Arrays.asList(1722553000000L, D5_BINARY, 65))); - state = partitionRecognizer.getState(); + state = partitionRecognizer.nextState(); Assert.assertEquals(PartitionState.StateType.NEED_MORE_DATA, state.getStateType()); // no more data, expected FINISHED Assert.assertFalse(childOperator.hasNext()); partitionRecognizer.noMoreData(); - state = partitionRecognizer.getState(); + state = partitionRecognizer.nextState(); Assert.assertEquals(PartitionState.StateType.FINISHED, state.getStateType()); } catch (Exception e) { fail(e.getMessage()); } } - private void checkIteratorSimply(Iterator<Record> recordIterable, List<List<Object>> expected) { + @Test + public void testConstructResult() { + // TODO(UDF) + } + + private void checkIteratorSimply(Slice slice, List<List<Object>> expected) { + Iterator<Record> recordIterable = slice.getRequiredRecordIterator(); int i = 0; while (recordIterable.hasNext()) { Record record = recordIterable.next(); diff --git a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 index 11bb0ffe28d..656894aba19 100644 --- a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 +++ b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 @@ -740,7 +740,7 @@ tableFunctionCall ; tableFunctionArgument - : (identifier '=>')? (tableArgument | descriptorArgument | scalarArgument) // descriptor before expression to avoid parsing descriptor as a function call + : (identifier '=>')? (tableArgument | scalarArgument) // descriptor before expression to avoid parsing descriptor as a function call ; tableArgument @@ -755,15 +755,6 @@ tableArgumentRelation | TABLE '(' query ')' (AS? identifier columnAliases?)? #tableArgumentQuery ; -descriptorArgument - : DESCRIPTOR '(' descriptorField (',' descriptorField)* ')' - | CAST '(' NULL AS DESCRIPTOR ')' - ; - -descriptorField - : identifier type? - ; - scalarArgument : expression | timeDuration
