This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 78aa02f106cfe261200d4efce3c09d2a2882ebec Author: luoyuxia <[email protected]> AuthorDate: Mon Jun 19 16:59:57 2023 +0800 [FLINK-32354][table] Planner supports converting call procedure statement to CallOperation --- .../table/catalog/ContextResolvedProcedure.java | 49 ++++ .../flink/table/catalog/FunctionCatalog.java | 19 ++ .../table/operations/CallProcedureOperation.java | 25 ++ .../table/planner/calcite/SqlToRexConverter.java | 5 + .../catalog/FunctionCatalogOperatorTable.java | 22 +- .../functions/bridging/BridgingSqlProcedure.java | 93 ++++++++ .../planner/functions/bridging/BridgingUtils.java | 16 ++ .../operations/PlannerCallProcedureOperation.java | 67 ++++++ .../planner/operations/SqlNodeConvertContext.java | 59 ++++- .../operations/converters/SqlNodeConverter.java | 12 + .../operations/converters/SqlNodeConverters.java | 1 + .../converters/SqlProcedureCallConverter.java | 141 ++++++++++++ .../table/planner/calcite/FlinkPlannerImpl.scala | 11 +- .../operations/SqlNodeToCallOperationTest.java | 252 +++++++++++++++++++++ 14 files changed, 756 insertions(+), 16 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedProcedure.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedProcedure.java new file mode 100644 index 00000000000..4eb36fe2717 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedProcedure.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.functions.FunctionIdentifier; +import org.apache.flink.table.procedures.Procedure; + +/** + * This class contains information about a procedure and its relationship with a {@link Catalog}. + */ +@Internal +public class ContextResolvedProcedure { + + // we reuse FunctionIdentifier as the identifier for procedure, + // since procedure and function are almost same + private final FunctionIdentifier procedureIdentifier; + + private final Procedure procedure; + + public ContextResolvedProcedure(FunctionIdentifier procedureIdentifier, Procedure procedure) { + this.procedureIdentifier = procedureIdentifier; + this.procedure = procedure; + } + + public FunctionIdentifier getIdentifier() { + return procedureIdentifier; + } + + public Procedure getProcedure() { + return procedure; + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java index a65845de34c..b50d47c4865 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java @@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException; import org.apache.flink.table.delegation.PlannerTypeInferenceUtil; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.AggregateFunctionDefinition; @@ -40,6 +41,7 @@ import org.apache.flink.table.functions.TableFunctionDefinition; import org.apache.flink.table.functions.UserDefinedFunction; import org.apache.flink.table.functions.UserDefinedFunctionHelper; import org.apache.flink.table.module.ModuleManager; +import org.apache.flink.table.procedures.Procedure; import org.apache.flink.table.resource.ResourceManager; import org.apache.flink.table.resource.ResourceUri; import org.apache.flink.util.Preconditions; @@ -395,6 +397,23 @@ public final class FunctionCatalog { }; } + public Optional<ContextResolvedProcedure> lookupProcedure(UnresolvedIdentifier identifier) { + ObjectIdentifier procedureIdentifier = catalogManager.qualifyIdentifier(identifier); + Optional<Catalog> catalog = catalogManager.getCatalog(procedureIdentifier.getCatalogName()); + if (catalog.isPresent()) { + Procedure procedure; + try { + procedure = catalog.get().getProcedure(procedureIdentifier.toObjectPath()); + } catch (ProcedureNotExistException e) { + return Optional.empty(); + } + return Optional.of( + new ContextResolvedProcedure( + FunctionIdentifier.of(procedureIdentifier), procedure)); + } + return Optional.empty(); + } + public Optional<ContextResolvedFunction> lookupFunction(UnresolvedIdentifier identifier) { // precise function reference if (identifier.getDatabaseName().isPresent()) { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java new file mode 100644 index 00000000000..b432bf73021 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; + +/** A {@link Operation} that describes the call procedure statement. */ +@Internal +public interface CallProcedureOperation extends Operation {} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/SqlToRexConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/SqlToRexConverter.java index f5a1253d2d6..8547382a03b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/SqlToRexConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/SqlToRexConverter.java @@ -76,6 +76,11 @@ public class SqlToRexConverter { return planner.rex(parser.parseExpression(expr), inputRowType, outputType); } + /** Converts a {@link SqlNode} to a {@link RexNode} expression. */ + public RexNode convertToRexNode(SqlNode sqlNode) { + return planner.rex(sqlNode, inputRowType, outputType); + } + /** * Converts an array of SQL expressions to an array of {@link RexNode} expressions. * diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java index fa0875004cc..4e5bb766cbb 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java @@ -21,6 +21,7 @@ package org.apache.flink.table.planner.catalog; import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ContextResolvedFunction; +import org.apache.flink.table.catalog.ContextResolvedProcedure; import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.UnresolvedIdentifier; @@ -35,6 +36,7 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.calcite.RexFactory; import org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction; import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; +import org.apache.flink.table.planner.functions.bridging.BridgingSqlProcedure; import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils; import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.table.types.inference.TypeStrategies; @@ -87,10 +89,22 @@ public class FunctionCatalogOperatorTable implements SqlOperatorTable { final UnresolvedIdentifier identifier = UnresolvedIdentifier.of(opName.names); - functionCatalog - .lookupFunction(identifier) - .flatMap(resolvedFunction -> convertToSqlFunction(category, resolvedFunction)) - .ifPresent(operatorList::add); + if (category == SqlFunctionCategory.USER_DEFINED_PROCEDURE) { + functionCatalog + .lookupProcedure(identifier) + .flatMap(this::convertToSqlProcedure) + .ifPresent(operatorList::add); + } else { + functionCatalog + .lookupFunction(identifier) + .flatMap(resolvedFunction -> convertToSqlFunction(category, resolvedFunction)) + .ifPresent(operatorList::add); + } + } + + private Optional<SqlFunction> convertToSqlProcedure( + ContextResolvedProcedure resolvedProcedure) { + return Optional.of(BridgingSqlProcedure.of(dataTypeFactory, resolvedProcedure)); } private Optional<SqlFunction> convertToSqlFunction( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlProcedure.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlProcedure.java new file mode 100644 index 00000000000..13789c2da1f --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlProcedure.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.bridging; + +import org.apache.flink.table.catalog.ContextResolvedProcedure; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.procedures.Procedure; +import org.apache.flink.table.procedures.ProcedureDefinition; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.extraction.TypeInferenceExtractor; +import org.apache.flink.table.types.inference.TypeInference; + +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.SqlOperandTypeChecker; +import org.apache.calcite.sql.type.SqlOperandTypeInference; +import org.apache.calcite.sql.type.SqlReturnTypeInference; +import org.checkerframework.checker.nullness.qual.Nullable; + +import static org.apache.flink.table.planner.functions.bridging.BridgingUtils.createName; +import static org.apache.flink.table.planner.functions.bridging.BridgingUtils.createSqlIdentifier; +import static org.apache.flink.table.planner.functions.bridging.BridgingUtils.createSqlOperandTypeChecker; +import static org.apache.flink.table.planner.functions.bridging.BridgingUtils.createSqlOperandTypeInference; +import static org.apache.flink.table.planner.functions.bridging.BridgingUtils.createSqlReturnTypeInference; + +/** Bridges {@link Procedure} to Calcite's representation of a function. */ +public class BridgingSqlProcedure extends SqlFunction { + + private final ContextResolvedProcedure contextResolvedProcedure; + + private BridgingSqlProcedure( + String name, + SqlIdentifier sqlIdentifier, + @Nullable SqlReturnTypeInference returnTypeInference, + @Nullable SqlOperandTypeInference operandTypeInference, + @Nullable SqlOperandTypeChecker operandTypeChecker, + SqlFunctionCategory category, + ContextResolvedProcedure contextResolvedProcedure) { + super( + name, + sqlIdentifier, + SqlKind.OTHER_FUNCTION, + returnTypeInference, + operandTypeInference, + operandTypeChecker, + category); + this.contextResolvedProcedure = contextResolvedProcedure; + } + + public ContextResolvedProcedure getContextResolveProcedure() { + return contextResolvedProcedure; + } + + /** + * Creates an instance of a procedure. + * + * @param dataTypeFactory used for creating {@link DataType} + * @param resolvedProcedure {@link Procedure} with context + */ + public static BridgingSqlProcedure of( + DataTypeFactory dataTypeFactory, ContextResolvedProcedure resolvedProcedure) { + final Procedure procedure = resolvedProcedure.getProcedure(); + final ProcedureDefinition procedureDefinition = new ProcedureDefinition(procedure); + final TypeInference typeInference = + TypeInferenceExtractor.forProcedure(dataTypeFactory, procedure.getClass()); + return new BridgingSqlProcedure( + createName(resolvedProcedure), + createSqlIdentifier(resolvedProcedure), + createSqlReturnTypeInference(dataTypeFactory, procedureDefinition, typeInference), + createSqlOperandTypeInference(dataTypeFactory, procedureDefinition, typeInference), + createSqlOperandTypeChecker(dataTypeFactory, procedureDefinition, typeInference), + SqlFunctionCategory.USER_DEFINED_PROCEDURE, + resolvedProcedure); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingUtils.java index 344083d8219..582ed0871c0 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingUtils.java @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.functions.bridging; import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.ContextResolvedFunction; +import org.apache.flink.table.catalog.ContextResolvedProcedure; import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.functions.AggregateFunctionDefinition; @@ -59,6 +60,10 @@ final class BridgingUtils { .orElseGet(() -> createInlineFunctionName(resolvedFunction.getDefinition())); } + static String createName(ContextResolvedProcedure resolveProcedure) { + return extractName(resolveProcedure.getIdentifier()); + } + private static String extractName(FunctionIdentifier identifier) { if (identifier.getSimpleName().isPresent()) { return identifier.getSimpleName().get(); @@ -125,6 +130,17 @@ final class BridgingUtils { .orElse(null); } + static SqlIdentifier createSqlIdentifier(ContextResolvedProcedure resolvedProcedure) { + final FunctionIdentifier fi = resolvedProcedure.getIdentifier(); + return fi.getIdentifier() + .map(oi -> new SqlIdentifier(oi.toList(), SqlParserPos.ZERO)) + .orElseGet( + () -> + new SqlIdentifier( + fi.getSimpleName().orElseThrow(IllegalStateException::new), + SqlParserPos.ZERO)); + } + static SqlReturnTypeInference createSqlReturnTypeInference( DataTypeFactory dataTypeFactory, FunctionDefinition definition, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java new file mode 100644 index 00000000000..4a315e8ccad --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations; + +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.operations.CallProcedureOperation; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; +import org.apache.flink.table.procedures.Procedure; +import org.apache.flink.table.types.DataType; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +/** Wrapper for valid call procedure operation generated by Planner. */ +public class PlannerCallProcedureOperation implements CallProcedureOperation { + + private final ObjectIdentifier procedureIdentifier; + private final Procedure procedure; + + /** The internal represent for input arguments. */ + private final Object[] internalInputArguments; + + private final DataType[] inputTypes; + private final DataType outputType; + + public PlannerCallProcedureOperation( + ObjectIdentifier procedureIdentifier, + Procedure procedure, + Object[] internalInputArguments, + DataType[] inputTypes, + DataType outputType) { + this.procedureIdentifier = procedureIdentifier; + this.procedure = procedure; + this.internalInputArguments = internalInputArguments; + this.inputTypes = inputTypes; + this.outputType = outputType; + } + + @Override + public String asSummaryString() { + Map<String, Object> params = new LinkedHashMap<>(); + params.put("procedureIdentifier", procedureIdentifier); + params.put("inputTypes", inputTypes); + params.put("outputTypes", outputType); + params.put("arguments", internalInputArguments); + return OperationUtils.formatWithChildren( + "CALL PROCEDURE", params, Collections.emptyList(), Operation::asSummaryString); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java index d2b0b93c029..50d894bd825 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java @@ -20,16 +20,29 @@ package org.apache.flink.table.planner.operations; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; +import org.apache.flink.table.planner.calcite.SqlToRexConverter; import org.apache.flink.table.planner.operations.converters.SqlNodeConverter; +import org.apache.flink.table.planner.typeutils.LogicalRelDataTypeConverter; import org.apache.flink.table.planner.utils.Expander; +import org.apache.flink.table.types.DataType; +import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlDialect; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.dialect.AnsiSqlDialect; import org.apache.calcite.sql.parser.SqlParser; import org.apache.calcite.sql.validate.SqlValidator; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + /** An implementation of {@link SqlNodeConverter.ConvertContext}. */ public class SqlNodeConvertContext implements SqlNodeConverter.ConvertContext { @@ -56,19 +69,45 @@ public class SqlNodeConvertContext implements SqlNodeConverter.ConvertContext { return flinkPlanner.rel(sqlNode); } + @Override + public RexNode toRexNode( + SqlNode sqlNode, RelDataType inputRowType, @Nullable DataType outputType) { + RelDataTypeFactory relDataTypeFactory = getSqlValidator().getTypeFactory(); + SqlDialect sqlDialect = getSqlDialect(); + RelDataType outputRelType = + outputType == null + ? null + : LogicalRelDataTypeConverter.toRelDataType( + outputType.getLogicalType(), relDataTypeFactory); + return new SqlToRexConverter(flinkPlanner, sqlDialect, inputRowType, outputRelType) + .convertToRexNode(sqlNode); + } + + @Override + public List<RexNode> reduceRexNodes(List<RexNode> rexNodes) { + List<RexNode> reducedNodes = new ArrayList<>(); + RelOptCluster relOptCluster = flinkPlanner.cluster(); + Objects.requireNonNull(relOptCluster.getPlanner().getExecutor()) + .reduce(relOptCluster.getRexBuilder(), rexNodes, reducedNodes); + return reducedNodes; + } + @Override public String toQuotedSqlString(SqlNode sqlNode) { + return sqlNode.toSqlString(getSqlDialect()).getSql(); + } + + private SqlDialect getSqlDialect() { SqlParser.Config parserConfig = flinkPlanner.config().getParserConfig(); - SqlDialect dialect = - // The default implementation of SqlDialect suppresses all table hints since - // CALCITE-4640, so we should use AnsiSqlDialect instead to reserve table hints. - new AnsiSqlDialect( - SqlDialect.EMPTY_CONTEXT - .withQuotedCasing(parserConfig.unquotedCasing()) - .withConformance(parserConfig.conformance()) - .withUnquotedCasing(parserConfig.unquotedCasing()) - .withIdentifierQuoteString(parserConfig.quoting().string)); - return sqlNode.toSqlString(dialect).getSql(); + return + // The default implementation of SqlDialect suppresses all table hints since + // CALCITE-4640, so we should use AnsiSqlDialect instead to reserve table hints. + new AnsiSqlDialect( + SqlDialect.EMPTY_CONTEXT + .withQuotedCasing(parserConfig.unquotedCasing()) + .withConformance(parserConfig.conformance()) + .withUnquotedCasing(parserConfig.unquotedCasing()) + .withIdentifierQuoteString(parserConfig.quoting().string)); } @Override diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java index c411ad40457..a2b48359810 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java @@ -21,13 +21,19 @@ package org.apache.flink.table.planner.operations.converters; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.planner.utils.Expander; +import org.apache.flink.table.types.DataType; import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.validate.SqlValidator; +import javax.annotation.Nullable; + import java.util.EnumSet; +import java.util.List; import java.util.Optional; /** @@ -75,6 +81,12 @@ public interface SqlNodeConverter<S extends SqlNode> { /** Converts the given validated {@link SqlNode} into a {@link RelRoot}. */ RelRoot toRelRoot(SqlNode sqlNode); + /** Converts the given validated {@link SqlNode} into a {@link RexNode}. */ + RexNode toRexNode(SqlNode sqlNode, RelDataType inputRowType, @Nullable DataType outputType); + + /** Reduce the given {@link RexNode}s. */ + List<RexNode> reduceRexNodes(List<RexNode> rexNodes); + /** Convert the given {@param sqlNode} into a quoted SQL string. */ String toQuotedSqlString(SqlNode sqlNode); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java index 3972b554523..2a3639a53a8 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java @@ -51,6 +51,7 @@ public class SqlNodeConverters { register(new SqlShowFunctionsConverter()); register(new SqlShowProcedureConverter()); register(new SqlReplaceTableAsConverter()); + register(new SqlProcedureCallConverter()); } /** diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java new file mode 100644 index 00000000000..8648937eb98 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.planner.functions.bridging.BridgingSqlProcedure; +import org.apache.flink.table.planner.functions.inference.OperatorBindingCallContext; +import org.apache.flink.table.planner.operations.PlannerCallProcedureOperation; +import org.apache.flink.table.planner.plan.utils.RexLiteralUtil; +import org.apache.flink.table.planner.typeutils.LogicalRelDataTypeConverter; +import org.apache.flink.table.procedures.ProcedureDefinition; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.TypeInferenceUtil; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.ExplicitOperatorBinding; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperatorBinding; +import org.apache.calcite.sql.validate.SqlValidator; + +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * A converter for call procedure node. The call procedure statement will be parsed to a SqlCall + * wrapping SqlProcedureCallOperator as the operator by calcite. So, this converter will try to + * recognize it's call procedure or not. If it's call procedure, convert it the corresponding + * operation. Otherwise, return null directly. + */ +public class SqlProcedureCallConverter implements SqlNodeConverter<SqlNode> { + + @Override + public Optional<EnumSet<SqlKind>> supportedSqlKinds() { + return Optional.of(EnumSet.of(SqlKind.PROCEDURE_CALL)); + } + + @Override + public Operation convertSqlNode(SqlNode sqlNode, ConvertContext context) { + SqlCall callProcedure = (SqlCall) ((SqlCall) sqlNode).getOperandList().get(0); + BridgingSqlProcedure sqlProcedure = (BridgingSqlProcedure) callProcedure.getOperator(); + SqlValidator sqlValidator = context.getSqlValidator(); + ProcedureDefinition procedureDefinition = + new ProcedureDefinition(sqlProcedure.getContextResolveProcedure().getProcedure()); + + SqlOperatorBinding sqlOperatorBinding = + new ExplicitOperatorBinding( + context.getSqlValidator().getTypeFactory(), + sqlProcedure, + callProcedure.getOperandList().stream() + .map(sqlValidator::getValidatedNodeType) + .collect(Collectors.toList())); + + OperatorBindingCallContext bindingCallContext = + new OperatorBindingCallContext( + context.getCatalogManager().getDataTypeFactory(), + procedureDefinition, + sqlOperatorBinding, + sqlValidator.getValidatedNodeType(callProcedure)); + + // run type inference to infer the type including types of input args + // and output + TypeInferenceUtil.Result typeInferResult = + TypeInferenceUtil.runTypeInference( + procedureDefinition.getTypeInference( + context.getCatalogManager().getDataTypeFactory()), + bindingCallContext, + null); + + List<RexNode> reducedOperands = reduceOperands(callProcedure, context); + List<DataType> argumentTypes = typeInferResult.getExpectedArgumentTypes(); + int argumentCount = argumentTypes.size(); + DataType[] inputTypes = new DataType[argumentCount]; + Object[] params = new Object[argumentCount]; + for (int i = 0; i < argumentCount; i++) { + inputTypes[i] = argumentTypes.get(i); + RexNode reducedOperand = reducedOperands.get(i); + if (!(reducedOperand instanceof RexLiteral)) { + throw new ValidationException( + String.format( + "The argument at position %s %s for calling procedure can't be converted to " + + "literal.", + i, context.toQuotedSqlString(callProcedure.operand(i)))); + } + + // convert the literal to Flink internal representation + RexLiteral literalOperand = (RexLiteral) reducedOperand; + Object internalValue = + RexLiteralUtil.toFlinkInternalValue( + literalOperand.getValueAs(Comparable.class), + inputTypes[i].getLogicalType()); + params[i] = internalValue; + } + return new PlannerCallProcedureOperation( + sqlProcedure.getContextResolveProcedure().getIdentifier().getIdentifier().get(), + sqlProcedure.getContextResolveProcedure().getProcedure(), + params, + inputTypes, + typeInferResult.getOutputDataType()); + } + + private List<RexNode> reduceOperands(SqlCall sqlCall, ConvertContext context) { + // we don't really care about the input row type while converting to RexNode + // since call procedure shouldn't refer any inputs. + // so, construct an empty row for it. + RelDataType inputRowType = + LogicalRelDataTypeConverter.toRelDataType( + DataTypes.ROW().getLogicalType(), + context.getSqlValidator().getTypeFactory()); + List<RexNode> rexNodes = new ArrayList<>(); + for (int i = 0; i < sqlCall.operandCount(); i++) { + RexNode rexNode = context.toRexNode(sqlCall.operand(i), inputRowType, null); + rexNodes.add(rexNode); + } + return context.reduceRexNodes(rexNodes); + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala index ca1fffeaa1d..9250a9688aa 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala @@ -35,7 +35,7 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.{RelFieldCollation, RelRoot} import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rex.{RexInputRef, RexNode} -import org.apache.calcite.sql.{SqlCall, SqlHint, SqlKind, SqlNode, SqlNodeList, SqlOperatorTable, SqlSelect, SqlTableRef} +import org.apache.calcite.sql.{SqlBasicCall, SqlCall, SqlHint, SqlKind, SqlNode, SqlNodeList, SqlOperatorTable, SqlProcedureCallOperator, SqlSelect, SqlTableRef} import org.apache.calcite.sql.advise.SqlAdvisorValidator import org.apache.calcite.sql.util.SqlShuttle import org.apache.calcite.sql.validate.SqlValidator @@ -60,7 +60,7 @@ class FlinkPlannerImpl( val config: FrameworkConfig, catalogReaderSupplier: JFunction[JBoolean, CalciteCatalogReader], typeFactory: FlinkTypeFactory, - cluster: RelOptCluster) { + val cluster: RelOptCluster) { val operatorTable: SqlOperatorTable = config.getOperatorTable val parser: CalciteParser = new CalciteParser(config.getParserConfig) @@ -182,6 +182,13 @@ class FlinkPlannerImpl( case compileAndExecute: SqlCompileAndExecutePlan => compileAndExecute.setOperand(0, validate(compileAndExecute.getOperandList.get(0))) compileAndExecute + // for call procedure statement + case sqlCallNode if sqlCallNode.getKind == SqlKind.PROCEDURE_CALL => + val callNode = sqlCallNode.asInstanceOf[SqlBasicCall] + callNode.getOperandList.asScala.zipWithIndex.foreach { + case (operand, idx) => callNode.setOperand(idx, validate(operand)) + } + callNode case _ => validator.validate(sqlNode) } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToCallOperationTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToCallOperationTest.java new file mode 100644 index 00000000000..0d6a41379e8 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToCallOperationTest.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.procedure.ProcedureContext; +import org.apache.flink.table.procedures.Procedure; +import org.apache.flink.types.Row; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test cases for the call statements for {@link SqlNodeToOperationConversion}. */ +public class SqlNodeToCallOperationTest extends SqlNodeToOperationConversionTestBase { + + @BeforeEach + public void before() { + CatalogWithBuiltInProcedure procedureCatalog = + new CatalogWithBuiltInProcedure("procedure_catalog"); + catalogManager.registerCatalog("p1", procedureCatalog); + catalogManager.setCurrentCatalog("p1"); + } + + @Test + void testCallStatement() { + // test call the procedure which accepts primitive types as arguments + String sql = "call `system`.primitive_arg(1, 2)"; + verifyCallOperation( + sql, + "CALL PROCEDURE:" + + " (procedureIdentifier: [`p1`.`system`.`primitive_arg`]," + + " inputTypes: [INT NOT NULL, BIGINT NOT NULL], outputTypes: [INT NOT NULL], arguments: [1, 2])"); + + // test call the procedure which has different type mapping for single method + // call with int + sql = "call `system`.different_type_mapping(1)"; + verifyCallOperation( + sql, + "CALL PROCEDURE:" + + " (procedureIdentifier: [`p1`.`system`.`different_type_mapping`]," + + " inputTypes: [INT], outputTypes: [INT], arguments: [1])"); + // call with bigint + sql = "call `system`.different_type_mapping(cast(1 as bigint))"; + verifyCallOperation( + sql, + "CALL PROCEDURE:" + + " (procedureIdentifier: [`p1`.`system`.`different_type_mapping`]," + + " inputTypes: [BIGINT], outputTypes: [BIGINT], arguments: [1])"); + + // test call the procedure which has var arguments + sql = "call `system`.var_arg(1)"; + verifyCallOperation( + sql, + "CALL PROCEDURE:" + + " (procedureIdentifier: [`p1`.`system`.`var_arg`]," + + " inputTypes: [INT NOT NULL], outputTypes: [STRING], arguments: [1])"); + sql = "call `system`.var_arg(1, 2)"; + verifyCallOperation( + sql, + "CALL PROCEDURE:" + + " (procedureIdentifier: [`p1`.`system`.`var_arg`]," + + " inputTypes: [INT NOT NULL, INT NOT NULL], outputTypes: [STRING], arguments: [1, 2])"); + sql = "call `system`.var_arg(1, 2, 1 + 2)"; + verifyCallOperation( + sql, + "CALL PROCEDURE:" + + " (procedureIdentifier: [`p1`.`system`.`var_arg`]," + + " inputTypes: [INT NOT NULL, INT NOT NULL, INT NOT NULL], outputTypes: [STRING], arguments: [1, 2, 3])"); + + // test call the procedure with row as result and decimal as argument as well as + // explict/implicit cast + sql = "call `system`.row_result(cast(1.2 as decimal))"; + verifyCallOperation( + sql, + "CALL PROCEDURE:" + + " (procedureIdentifier: [`p1`.`system`.`row_result`]," + + " inputTypes: [DECIMAL(10, 2)], outputTypes: [ROW<`i` INT>], arguments: [1.20])"); + sql = "call `system`.row_result(1.2)"; + verifyCallOperation( + sql, + "CALL PROCEDURE:" + + " (procedureIdentifier: [`p1`.`system`.`row_result`]," + + " inputTypes: [DECIMAL(10, 2)], outputTypes: [ROW<`i` INT>], arguments: [1.20])"); + + // test call the procedure with pojo as result + sql = "call p1.`system`.pojo_result('name', 1)"; + verifyCallOperation( + sql, + "CALL PROCEDURE:" + + " (procedureIdentifier: [`p1`.`system`.`pojo_result`]," + + " inputTypes: [STRING, BIGINT NOT NULL]," + + " outputTypes: [*org.apache.flink.table.planner.operations.SqlNodeToCallOperationTest$MyPojo<`name` STRING, `id` BIGINT NOT NULL>*]," + + " arguments: [name, 1])"); + + // test call the procedure with timestamp as arguments + sql = + "call p1.`system`.timestamp_arg(timestamp '2023-04-22 00:00:00.300', " + + "timestamp '2023-04-22 00:00:00.300' + INTERVAL '1' day ) "; + verifyCallOperation( + sql, + String.format( + "CALL PROCEDURE:" + + " (procedureIdentifier: [`p1`.`system`.`timestamp_arg`]," + + " inputTypes: [TIMESTAMP(3), TIMESTAMP(3)], outputTypes: [TIMESTAMP(3)]," + + " arguments: [%s, %s])", + LocalDateTime.parse("2023-04-22T00:00:00.300"), + LocalDateTime.parse("2023-04-23T00:00:00.300"))); + // should throw exception when the signature doesn't match + assertThatThrownBy(() -> parse("call `system`.primitive_arg(1)")) + .hasMessageContaining( + "No match found for function signature primitive_arg(<NUMERIC>)"); + + // should throw exception when the expression argument can't be reduced + // to literal + assertThatThrownBy(() -> parse("call `system`.row_result(cast((1.2 + 2.4) as decimal))")) + .hasMessageContaining( + "The argument at position 0 CAST(CAST(1.2 + 2.4 AS DECIMAL) AS DECIMAL(10, 2)) for calling procedure can't be converted to literal."); + } + + private void verifyCallOperation(String sql, String expectSummary) { + Operation operation = parse(sql); + assertThat(operation).isInstanceOf(PlannerCallProcedureOperation.class); + assertThat(parse(sql).asSummaryString()).isEqualTo(expectSummary); + } + + /** A catalog with some built-in procedures for testing purpose. */ + private static class CatalogWithBuiltInProcedure extends GenericInMemoryCatalog { + + private static final Map<ObjectPath, Procedure> PROCEDURE_MAP = new HashMap<>(); + + static { + PROCEDURE_MAP.put( + ObjectPath.fromString("system.primitive_arg"), new ProcedureWithPrimitiveArg()); + PROCEDURE_MAP.put( + ObjectPath.fromString("system.different_type_mapping"), + new DifferentTypeMappingProcedure()); + PROCEDURE_MAP.put(ObjectPath.fromString("system.var_arg"), new VarArgProcedure()); + PROCEDURE_MAP.put(ObjectPath.fromString("system.row_result"), new RowResultProcedure()); + PROCEDURE_MAP.put( + ObjectPath.fromString("system.pojo_result"), new PojoResultProcedure()); + PROCEDURE_MAP.put( + ObjectPath.fromString("system.timestamp_arg"), new TimeStampArgProcedure()); + } + + public CatalogWithBuiltInProcedure(String name) { + super(name); + } + + @Override + public Procedure getProcedure(ObjectPath procedurePath) + throws ProcedureNotExistException, CatalogException { + if (PROCEDURE_MAP.containsKey(procedurePath)) { + return PROCEDURE_MAP.get(procedurePath); + } else { + throw new ProcedureNotExistException(getName(), procedurePath); + } + } + } + + private static class ProcedureWithPrimitiveArg implements Procedure { + public int[] call(ProcedureContext context, int arg1, long arg2) { + return null; + } + } + + @ProcedureHint(input = @DataTypeHint("INT"), output = @DataTypeHint("INT")) + @ProcedureHint(input = @DataTypeHint("BIGINT"), output = @DataTypeHint("BIGINT")) + private static class DifferentTypeMappingProcedure implements Procedure { + public Number[] call(ProcedureContext procedureContext, Number n) { + return null; + } + } + + private static class VarArgProcedure implements Procedure { + public String[] call(ProcedureContext procedureContext, int i, int... more) { + return null; + } + } + + private static class RowResultProcedure implements Procedure { + public @DataTypeHint("ROW<i INT>") Row[] call( + ProcedureContext procedureContext, + @DataTypeHint("DECIMAL(10, 2)") BigDecimal decimal) { + return null; + } + } + + private static class PojoResultProcedure implements Procedure { + public MyPojo[] call(ProcedureContext procedureContext, String name, long id) { + return new MyPojo[0]; + } + } + + private static class TimeStampArgProcedure implements Procedure { + public @DataTypeHint("TIMESTAMP(3)") LocalDateTime[] call( + ProcedureContext procedureContext, + @DataTypeHint("TIMESTAMP(3)") LocalDateTime localDateTime, + @DataTypeHint("TIMESTAMP(3)") TimestampData timestampData) { + return null; + } + } + + /** A simple pojo class for testing purpose. */ + public static class MyPojo { + private final String name; + private final long id; + + public MyPojo(String name, long id) { + this.name = name; + this.id = id; + } + + public String getName() { + return name; + } + + public long getId() { + return id; + } + } +}
