This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 78aa02f106cfe261200d4efce3c09d2a2882ebec
Author: luoyuxia <[email protected]>
AuthorDate: Mon Jun 19 16:59:57 2023 +0800

    [FLINK-32354][table] Planner supports converting call procedure statement 
to CallOperation
---
 .../table/catalog/ContextResolvedProcedure.java    |  49 ++++
 .../flink/table/catalog/FunctionCatalog.java       |  19 ++
 .../table/operations/CallProcedureOperation.java   |  25 ++
 .../table/planner/calcite/SqlToRexConverter.java   |   5 +
 .../catalog/FunctionCatalogOperatorTable.java      |  22 +-
 .../functions/bridging/BridgingSqlProcedure.java   |  93 ++++++++
 .../planner/functions/bridging/BridgingUtils.java  |  16 ++
 .../operations/PlannerCallProcedureOperation.java  |  67 ++++++
 .../planner/operations/SqlNodeConvertContext.java  |  59 ++++-
 .../operations/converters/SqlNodeConverter.java    |  12 +
 .../operations/converters/SqlNodeConverters.java   |   1 +
 .../converters/SqlProcedureCallConverter.java      | 141 ++++++++++++
 .../table/planner/calcite/FlinkPlannerImpl.scala   |  11 +-
 .../operations/SqlNodeToCallOperationTest.java     | 252 +++++++++++++++++++++
 14 files changed, 756 insertions(+), 16 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedProcedure.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedProcedure.java
new file mode 100644
index 00000000000..4eb36fe2717
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedProcedure.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.table.procedures.Procedure;
+
+/**
+ * This class contains information about a procedure and its relationship with 
a {@link Catalog}.
+ */
+@Internal
+public class ContextResolvedProcedure {
+
+    // we reuse FunctionIdentifier as the identifier for procedure,
+    // since procedure and function are almost same
+    private final FunctionIdentifier procedureIdentifier;
+
+    private final Procedure procedure;
+
+    public ContextResolvedProcedure(FunctionIdentifier procedureIdentifier, 
Procedure procedure) {
+        this.procedureIdentifier = procedureIdentifier;
+        this.procedure = procedure;
+    }
+
+    public FunctionIdentifier getIdentifier() {
+        return procedureIdentifier;
+    }
+
+    public Procedure getProcedure() {
+        return procedure;
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index a65845de34c..b50d47c4865 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
 import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.AggregateFunctionDefinition;
@@ -40,6 +41,7 @@ import 
org.apache.flink.table.functions.TableFunctionDefinition;
 import org.apache.flink.table.functions.UserDefinedFunction;
 import org.apache.flink.table.functions.UserDefinedFunctionHelper;
 import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.procedures.Procedure;
 import org.apache.flink.table.resource.ResourceManager;
 import org.apache.flink.table.resource.ResourceUri;
 import org.apache.flink.util.Preconditions;
@@ -395,6 +397,23 @@ public final class FunctionCatalog {
         };
     }
 
+    public Optional<ContextResolvedProcedure> 
lookupProcedure(UnresolvedIdentifier identifier) {
+        ObjectIdentifier procedureIdentifier = 
catalogManager.qualifyIdentifier(identifier);
+        Optional<Catalog> catalog = 
catalogManager.getCatalog(procedureIdentifier.getCatalogName());
+        if (catalog.isPresent()) {
+            Procedure procedure;
+            try {
+                procedure = 
catalog.get().getProcedure(procedureIdentifier.toObjectPath());
+            } catch (ProcedureNotExistException e) {
+                return Optional.empty();
+            }
+            return Optional.of(
+                    new ContextResolvedProcedure(
+                            FunctionIdentifier.of(procedureIdentifier), 
procedure));
+        }
+        return Optional.empty();
+    }
+
     public Optional<ContextResolvedFunction> 
lookupFunction(UnresolvedIdentifier identifier) {
         // precise function reference
         if (identifier.getDatabaseName().isPresent()) {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java
new file mode 100644
index 00000000000..b432bf73021
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+
+/** A {@link Operation} that describes the call procedure statement. */
+@Internal
+public interface CallProcedureOperation extends Operation {}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/SqlToRexConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/SqlToRexConverter.java
index f5a1253d2d6..8547382a03b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/SqlToRexConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/SqlToRexConverter.java
@@ -76,6 +76,11 @@ public class SqlToRexConverter {
         return planner.rex(parser.parseExpression(expr), inputRowType, 
outputType);
     }
 
+    /** Converts a {@link SqlNode} to a {@link RexNode} expression. */
+    public RexNode convertToRexNode(SqlNode sqlNode) {
+        return planner.rex(sqlNode, inputRowType, outputType);
+    }
+
     /**
      * Converts an array of SQL expressions to an array of {@link RexNode} 
expressions.
      *
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
index fa0875004cc..4e5bb766cbb 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.catalog;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ContextResolvedFunction;
+import org.apache.flink.table.catalog.ContextResolvedProcedure;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
@@ -35,6 +36,7 @@ import 
org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.calcite.RexFactory;
 import 
org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction;
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlProcedure;
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils;
 import org.apache.flink.table.types.inference.TypeInference;
 import org.apache.flink.table.types.inference.TypeStrategies;
@@ -87,10 +89,22 @@ public class FunctionCatalogOperatorTable implements 
SqlOperatorTable {
 
         final UnresolvedIdentifier identifier = 
UnresolvedIdentifier.of(opName.names);
 
-        functionCatalog
-                .lookupFunction(identifier)
-                .flatMap(resolvedFunction -> convertToSqlFunction(category, 
resolvedFunction))
-                .ifPresent(operatorList::add);
+        if (category == SqlFunctionCategory.USER_DEFINED_PROCEDURE) {
+            functionCatalog
+                    .lookupProcedure(identifier)
+                    .flatMap(this::convertToSqlProcedure)
+                    .ifPresent(operatorList::add);
+        } else {
+            functionCatalog
+                    .lookupFunction(identifier)
+                    .flatMap(resolvedFunction -> 
convertToSqlFunction(category, resolvedFunction))
+                    .ifPresent(operatorList::add);
+        }
+    }
+
+    private Optional<SqlFunction> convertToSqlProcedure(
+            ContextResolvedProcedure resolvedProcedure) {
+        return Optional.of(BridgingSqlProcedure.of(dataTypeFactory, 
resolvedProcedure));
     }
 
     private Optional<SqlFunction> convertToSqlFunction(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlProcedure.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlProcedure.java
new file mode 100644
index 00000000000..13789c2da1f
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlProcedure.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.bridging;
+
+import org.apache.flink.table.catalog.ContextResolvedProcedure;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.procedures.Procedure;
+import org.apache.flink.table.procedures.ProcedureDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.extraction.TypeInferenceExtractor;
+import org.apache.flink.table.types.inference.TypeInference;
+
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
+import org.apache.calcite.sql.type.SqlOperandTypeInference;
+import org.apache.calcite.sql.type.SqlReturnTypeInference;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import static 
org.apache.flink.table.planner.functions.bridging.BridgingUtils.createName;
+import static 
org.apache.flink.table.planner.functions.bridging.BridgingUtils.createSqlIdentifier;
+import static 
org.apache.flink.table.planner.functions.bridging.BridgingUtils.createSqlOperandTypeChecker;
+import static 
org.apache.flink.table.planner.functions.bridging.BridgingUtils.createSqlOperandTypeInference;
+import static 
org.apache.flink.table.planner.functions.bridging.BridgingUtils.createSqlReturnTypeInference;
+
+/** Bridges {@link Procedure} to Calcite's representation of a function. */
+public class BridgingSqlProcedure extends SqlFunction {
+
+    private final ContextResolvedProcedure contextResolvedProcedure;
+
+    private BridgingSqlProcedure(
+            String name,
+            SqlIdentifier sqlIdentifier,
+            @Nullable SqlReturnTypeInference returnTypeInference,
+            @Nullable SqlOperandTypeInference operandTypeInference,
+            @Nullable SqlOperandTypeChecker operandTypeChecker,
+            SqlFunctionCategory category,
+            ContextResolvedProcedure contextResolvedProcedure) {
+        super(
+                name,
+                sqlIdentifier,
+                SqlKind.OTHER_FUNCTION,
+                returnTypeInference,
+                operandTypeInference,
+                operandTypeChecker,
+                category);
+        this.contextResolvedProcedure = contextResolvedProcedure;
+    }
+
+    public ContextResolvedProcedure getContextResolveProcedure() {
+        return contextResolvedProcedure;
+    }
+
+    /**
+     * Creates an instance of a procedure.
+     *
+     * @param dataTypeFactory used for creating {@link DataType}
+     * @param resolvedProcedure {@link Procedure} with context
+     */
+    public static BridgingSqlProcedure of(
+            DataTypeFactory dataTypeFactory, ContextResolvedProcedure 
resolvedProcedure) {
+        final Procedure procedure = resolvedProcedure.getProcedure();
+        final ProcedureDefinition procedureDefinition = new 
ProcedureDefinition(procedure);
+        final TypeInference typeInference =
+                TypeInferenceExtractor.forProcedure(dataTypeFactory, 
procedure.getClass());
+        return new BridgingSqlProcedure(
+                createName(resolvedProcedure),
+                createSqlIdentifier(resolvedProcedure),
+                createSqlReturnTypeInference(dataTypeFactory, 
procedureDefinition, typeInference),
+                createSqlOperandTypeInference(dataTypeFactory, 
procedureDefinition, typeInference),
+                createSqlOperandTypeChecker(dataTypeFactory, 
procedureDefinition, typeInference),
+                SqlFunctionCategory.USER_DEFINED_PROCEDURE,
+                resolvedProcedure);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingUtils.java
index 344083d8219..582ed0871c0 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.functions.bridging;
 
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.ContextResolvedFunction;
+import org.apache.flink.table.catalog.ContextResolvedProcedure;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.functions.AggregateFunctionDefinition;
@@ -59,6 +60,10 @@ final class BridgingUtils {
                 .orElseGet(() -> 
createInlineFunctionName(resolvedFunction.getDefinition()));
     }
 
+    static String createName(ContextResolvedProcedure resolveProcedure) {
+        return extractName(resolveProcedure.getIdentifier());
+    }
+
     private static String extractName(FunctionIdentifier identifier) {
         if (identifier.getSimpleName().isPresent()) {
             return identifier.getSimpleName().get();
@@ -125,6 +130,17 @@ final class BridgingUtils {
                 .orElse(null);
     }
 
+    static SqlIdentifier createSqlIdentifier(ContextResolvedProcedure 
resolvedProcedure) {
+        final FunctionIdentifier fi = resolvedProcedure.getIdentifier();
+        return fi.getIdentifier()
+                .map(oi -> new SqlIdentifier(oi.toList(), SqlParserPos.ZERO))
+                .orElseGet(
+                        () ->
+                                new SqlIdentifier(
+                                        
fi.getSimpleName().orElseThrow(IllegalStateException::new),
+                                        SqlParserPos.ZERO));
+    }
+
     static SqlReturnTypeInference createSqlReturnTypeInference(
             DataTypeFactory dataTypeFactory,
             FunctionDefinition definition,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
new file mode 100644
index 00000000000..4a315e8ccad
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.CallProcedureOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+import org.apache.flink.table.procedures.Procedure;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** Wrapper for valid call procedure operation generated by Planner. */
+public class PlannerCallProcedureOperation implements CallProcedureOperation {
+
+    private final ObjectIdentifier procedureIdentifier;
+    private final Procedure procedure;
+
+    /** The internal represent for input arguments. */
+    private final Object[] internalInputArguments;
+
+    private final DataType[] inputTypes;
+    private final DataType outputType;
+
+    public PlannerCallProcedureOperation(
+            ObjectIdentifier procedureIdentifier,
+            Procedure procedure,
+            Object[] internalInputArguments,
+            DataType[] inputTypes,
+            DataType outputType) {
+        this.procedureIdentifier = procedureIdentifier;
+        this.procedure = procedure;
+        this.internalInputArguments = internalInputArguments;
+        this.inputTypes = inputTypes;
+        this.outputType = outputType;
+    }
+
+    @Override
+    public String asSummaryString() {
+        Map<String, Object> params = new LinkedHashMap<>();
+        params.put("procedureIdentifier", procedureIdentifier);
+        params.put("inputTypes", inputTypes);
+        params.put("outputTypes", outputType);
+        params.put("arguments", internalInputArguments);
+        return OperationUtils.formatWithChildren(
+                "CALL PROCEDURE", params, Collections.emptyList(), 
Operation::asSummaryString);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
index d2b0b93c029..50d894bd825 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
@@ -20,16 +20,29 @@ package org.apache.flink.table.planner.operations;
 
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.planner.calcite.SqlToRexConverter;
 import org.apache.flink.table.planner.operations.converters.SqlNodeConverter;
+import org.apache.flink.table.planner.typeutils.LogicalRelDataTypeConverter;
 import org.apache.flink.table.planner.utils.Expander;
+import org.apache.flink.table.types.DataType;
 
+import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlDialect;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.dialect.AnsiSqlDialect;
 import org.apache.calcite.sql.parser.SqlParser;
 import org.apache.calcite.sql.validate.SqlValidator;
 
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
 /** An implementation of {@link SqlNodeConverter.ConvertContext}. */
 public class SqlNodeConvertContext implements SqlNodeConverter.ConvertContext {
 
@@ -56,19 +69,45 @@ public class SqlNodeConvertContext implements 
SqlNodeConverter.ConvertContext {
         return flinkPlanner.rel(sqlNode);
     }
 
+    @Override
+    public RexNode toRexNode(
+            SqlNode sqlNode, RelDataType inputRowType, @Nullable DataType 
outputType) {
+        RelDataTypeFactory relDataTypeFactory = 
getSqlValidator().getTypeFactory();
+        SqlDialect sqlDialect = getSqlDialect();
+        RelDataType outputRelType =
+                outputType == null
+                        ? null
+                        : LogicalRelDataTypeConverter.toRelDataType(
+                                outputType.getLogicalType(), 
relDataTypeFactory);
+        return new SqlToRexConverter(flinkPlanner, sqlDialect, inputRowType, 
outputRelType)
+                .convertToRexNode(sqlNode);
+    }
+
+    @Override
+    public List<RexNode> reduceRexNodes(List<RexNode> rexNodes) {
+        List<RexNode> reducedNodes = new ArrayList<>();
+        RelOptCluster relOptCluster = flinkPlanner.cluster();
+        Objects.requireNonNull(relOptCluster.getPlanner().getExecutor())
+                .reduce(relOptCluster.getRexBuilder(), rexNodes, reducedNodes);
+        return reducedNodes;
+    }
+
     @Override
     public String toQuotedSqlString(SqlNode sqlNode) {
+        return sqlNode.toSqlString(getSqlDialect()).getSql();
+    }
+
+    private SqlDialect getSqlDialect() {
         SqlParser.Config parserConfig = 
flinkPlanner.config().getParserConfig();
-        SqlDialect dialect =
-                // The default implementation of SqlDialect suppresses all 
table hints since
-                // CALCITE-4640, so we should use AnsiSqlDialect instead to 
reserve table hints.
-                new AnsiSqlDialect(
-                        SqlDialect.EMPTY_CONTEXT
-                                
.withQuotedCasing(parserConfig.unquotedCasing())
-                                .withConformance(parserConfig.conformance())
-                                
.withUnquotedCasing(parserConfig.unquotedCasing())
-                                
.withIdentifierQuoteString(parserConfig.quoting().string));
-        return sqlNode.toSqlString(dialect).getSql();
+        return
+        // The default implementation of SqlDialect suppresses all table hints 
since
+        // CALCITE-4640, so we should use AnsiSqlDialect instead to reserve 
table hints.
+        new AnsiSqlDialect(
+                SqlDialect.EMPTY_CONTEXT
+                        .withQuotedCasing(parserConfig.unquotedCasing())
+                        .withConformance(parserConfig.conformance())
+                        .withUnquotedCasing(parserConfig.unquotedCasing())
+                        
.withIdentifierQuoteString(parserConfig.quoting().string));
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
index c411ad40457..a2b48359810 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
@@ -21,13 +21,19 @@ package 
org.apache.flink.table.planner.operations.converters;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.planner.utils.Expander;
+import org.apache.flink.table.types.DataType;
 
 import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.validate.SqlValidator;
 
+import javax.annotation.Nullable;
+
 import java.util.EnumSet;
+import java.util.List;
 import java.util.Optional;
 
 /**
@@ -75,6 +81,12 @@ public interface SqlNodeConverter<S extends SqlNode> {
         /** Converts the given validated {@link SqlNode} into a {@link 
RelRoot}. */
         RelRoot toRelRoot(SqlNode sqlNode);
 
+        /** Converts the given validated {@link SqlNode} into a {@link 
RexNode}. */
+        RexNode toRexNode(SqlNode sqlNode, RelDataType inputRowType, @Nullable 
DataType outputType);
+
+        /** Reduce the given {@link RexNode}s. */
+        List<RexNode> reduceRexNodes(List<RexNode> rexNodes);
+
         /** Convert the given {@param sqlNode} into a quoted SQL string. */
         String toQuotedSqlString(SqlNode sqlNode);
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
index 3972b554523..2a3639a53a8 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
@@ -51,6 +51,7 @@ public class SqlNodeConverters {
         register(new SqlShowFunctionsConverter());
         register(new SqlShowProcedureConverter());
         register(new SqlReplaceTableAsConverter());
+        register(new SqlProcedureCallConverter());
     }
 
     /**
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java
new file mode 100644
index 00000000000..8648937eb98
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlProcedure;
+import 
org.apache.flink.table.planner.functions.inference.OperatorBindingCallContext;
+import org.apache.flink.table.planner.operations.PlannerCallProcedureOperation;
+import org.apache.flink.table.planner.plan.utils.RexLiteralUtil;
+import org.apache.flink.table.planner.typeutils.LogicalRelDataTypeConverter;
+import org.apache.flink.table.procedures.ProcedureDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.TypeInferenceUtil;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.ExplicitOperatorBinding;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * A converter for call procedure node. The call procedure statement will be 
parsed to a SqlCall
+ * wrapping SqlProcedureCallOperator as the operator by calcite. So, this 
converter will try to
+ * recognize it's call procedure or not. If it's call procedure, convert it 
the corresponding
+ * operation. Otherwise, return null directly.
+ */
+public class SqlProcedureCallConverter implements SqlNodeConverter<SqlNode> {
+
+    @Override
+    public Optional<EnumSet<SqlKind>> supportedSqlKinds() {
+        return Optional.of(EnumSet.of(SqlKind.PROCEDURE_CALL));
+    }
+
+    @Override
+    public Operation convertSqlNode(SqlNode sqlNode, ConvertContext context) {
+        SqlCall callProcedure = (SqlCall) ((SqlCall) 
sqlNode).getOperandList().get(0);
+        BridgingSqlProcedure sqlProcedure = (BridgingSqlProcedure) 
callProcedure.getOperator();
+        SqlValidator sqlValidator = context.getSqlValidator();
+        ProcedureDefinition procedureDefinition =
+                new 
ProcedureDefinition(sqlProcedure.getContextResolveProcedure().getProcedure());
+
+        SqlOperatorBinding sqlOperatorBinding =
+                new ExplicitOperatorBinding(
+                        context.getSqlValidator().getTypeFactory(),
+                        sqlProcedure,
+                        callProcedure.getOperandList().stream()
+                                .map(sqlValidator::getValidatedNodeType)
+                                .collect(Collectors.toList()));
+
+        OperatorBindingCallContext bindingCallContext =
+                new OperatorBindingCallContext(
+                        context.getCatalogManager().getDataTypeFactory(),
+                        procedureDefinition,
+                        sqlOperatorBinding,
+                        sqlValidator.getValidatedNodeType(callProcedure));
+
+        // run type inference to infer the type including types of input args
+        // and output
+        TypeInferenceUtil.Result typeInferResult =
+                TypeInferenceUtil.runTypeInference(
+                        procedureDefinition.getTypeInference(
+                                
context.getCatalogManager().getDataTypeFactory()),
+                        bindingCallContext,
+                        null);
+
+        List<RexNode> reducedOperands = reduceOperands(callProcedure, context);
+        List<DataType> argumentTypes = 
typeInferResult.getExpectedArgumentTypes();
+        int argumentCount = argumentTypes.size();
+        DataType[] inputTypes = new DataType[argumentCount];
+        Object[] params = new Object[argumentCount];
+        for (int i = 0; i < argumentCount; i++) {
+            inputTypes[i] = argumentTypes.get(i);
+            RexNode reducedOperand = reducedOperands.get(i);
+            if (!(reducedOperand instanceof RexLiteral)) {
+                throw new ValidationException(
+                        String.format(
+                                "The argument at position %s %s for calling 
procedure can't be converted to "
+                                        + "literal.",
+                                i, 
context.toQuotedSqlString(callProcedure.operand(i))));
+            }
+
+            // convert the literal to Flink internal representation
+            RexLiteral literalOperand = (RexLiteral) reducedOperand;
+            Object internalValue =
+                    RexLiteralUtil.toFlinkInternalValue(
+                            literalOperand.getValueAs(Comparable.class),
+                            inputTypes[i].getLogicalType());
+            params[i] = internalValue;
+        }
+        return new PlannerCallProcedureOperation(
+                
sqlProcedure.getContextResolveProcedure().getIdentifier().getIdentifier().get(),
+                sqlProcedure.getContextResolveProcedure().getProcedure(),
+                params,
+                inputTypes,
+                typeInferResult.getOutputDataType());
+    }
+
+    private List<RexNode> reduceOperands(SqlCall sqlCall, ConvertContext 
context) {
+        // we don't really care about the input row type while converting to 
RexNode
+        // since call procedure shouldn't refer any inputs.
+        // so, construct an empty row for it.
+        RelDataType inputRowType =
+                LogicalRelDataTypeConverter.toRelDataType(
+                        DataTypes.ROW().getLogicalType(),
+                        context.getSqlValidator().getTypeFactory());
+        List<RexNode> rexNodes = new ArrayList<>();
+        for (int i = 0; i < sqlCall.operandCount(); i++) {
+            RexNode rexNode = context.toRexNode(sqlCall.operand(i), 
inputRowType, null);
+            rexNodes.add(rexNode);
+        }
+        return context.reduceRexNodes(rexNodes);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index ca1fffeaa1d..9250a9688aa 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -35,7 +35,7 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelFieldCollation, RelRoot}
 import org.apache.calcite.rel.hint.RelHint
 import org.apache.calcite.rex.{RexInputRef, RexNode}
-import org.apache.calcite.sql.{SqlCall, SqlHint, SqlKind, SqlNode, 
SqlNodeList, SqlOperatorTable, SqlSelect, SqlTableRef}
+import org.apache.calcite.sql.{SqlBasicCall, SqlCall, SqlHint, SqlKind, 
SqlNode, SqlNodeList, SqlOperatorTable, SqlProcedureCallOperator, SqlSelect, 
SqlTableRef}
 import org.apache.calcite.sql.advise.SqlAdvisorValidator
 import org.apache.calcite.sql.util.SqlShuttle
 import org.apache.calcite.sql.validate.SqlValidator
@@ -60,7 +60,7 @@ class FlinkPlannerImpl(
     val config: FrameworkConfig,
     catalogReaderSupplier: JFunction[JBoolean, CalciteCatalogReader],
     typeFactory: FlinkTypeFactory,
-    cluster: RelOptCluster) {
+    val cluster: RelOptCluster) {
 
   val operatorTable: SqlOperatorTable = config.getOperatorTable
   val parser: CalciteParser = new CalciteParser(config.getParserConfig)
@@ -182,6 +182,13 @@ class FlinkPlannerImpl(
         case compileAndExecute: SqlCompileAndExecutePlan =>
           compileAndExecute.setOperand(0, 
validate(compileAndExecute.getOperandList.get(0)))
           compileAndExecute
+        // for call procedure statement
+        case sqlCallNode if sqlCallNode.getKind == SqlKind.PROCEDURE_CALL =>
+          val callNode = sqlCallNode.asInstanceOf[SqlBasicCall]
+          callNode.getOperandList.asScala.zipWithIndex.foreach {
+            case (operand, idx) => callNode.setOperand(idx, validate(operand))
+          }
+          callNode
         case _ =>
           validator.validate(sqlNode)
       }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToCallOperationTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToCallOperationTest.java
new file mode 100644
index 00000000000..0d6a41379e8
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToCallOperationTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.table.procedures.Procedure;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test cases for the call statements for {@link 
SqlNodeToOperationConversion}. */
+public class SqlNodeToCallOperationTest extends 
SqlNodeToOperationConversionTestBase {
+
+    @BeforeEach
+    public void before() {
+        CatalogWithBuiltInProcedure procedureCatalog =
+                new CatalogWithBuiltInProcedure("procedure_catalog");
+        catalogManager.registerCatalog("p1", procedureCatalog);
+        catalogManager.setCurrentCatalog("p1");
+    }
+
+    @Test
+    void testCallStatement() {
+        // test call the procedure which accepts primitive types as arguments
+        String sql = "call `system`.primitive_arg(1, 2)";
+        verifyCallOperation(
+                sql,
+                "CALL PROCEDURE:"
+                        + " (procedureIdentifier: 
[`p1`.`system`.`primitive_arg`],"
+                        + " inputTypes: [INT NOT NULL, BIGINT NOT NULL], 
outputTypes: [INT NOT NULL], arguments: [1, 2])");
+
+        // test call the procedure which has different type mapping for single 
method
+        // call with int
+        sql = "call `system`.different_type_mapping(1)";
+        verifyCallOperation(
+                sql,
+                "CALL PROCEDURE:"
+                        + " (procedureIdentifier: 
[`p1`.`system`.`different_type_mapping`],"
+                        + " inputTypes: [INT], outputTypes: [INT], arguments: 
[1])");
+        // call with bigint
+        sql = "call `system`.different_type_mapping(cast(1 as bigint))";
+        verifyCallOperation(
+                sql,
+                "CALL PROCEDURE:"
+                        + " (procedureIdentifier: 
[`p1`.`system`.`different_type_mapping`],"
+                        + " inputTypes: [BIGINT], outputTypes: [BIGINT], 
arguments: [1])");
+
+        // test call the procedure which has var arguments
+        sql = "call `system`.var_arg(1)";
+        verifyCallOperation(
+                sql,
+                "CALL PROCEDURE:"
+                        + " (procedureIdentifier: [`p1`.`system`.`var_arg`],"
+                        + " inputTypes: [INT NOT NULL], outputTypes: [STRING], 
arguments: [1])");
+        sql = "call `system`.var_arg(1, 2)";
+        verifyCallOperation(
+                sql,
+                "CALL PROCEDURE:"
+                        + " (procedureIdentifier: [`p1`.`system`.`var_arg`],"
+                        + " inputTypes: [INT NOT NULL, INT NOT NULL], 
outputTypes: [STRING], arguments: [1, 2])");
+        sql = "call `system`.var_arg(1, 2, 1 + 2)";
+        verifyCallOperation(
+                sql,
+                "CALL PROCEDURE:"
+                        + " (procedureIdentifier: [`p1`.`system`.`var_arg`],"
+                        + " inputTypes: [INT NOT NULL, INT NOT NULL, INT NOT 
NULL], outputTypes: [STRING], arguments: [1, 2, 3])");
+
+        // test call the procedure with row as result and decimal as argument 
as well as
+        // explict/implicit cast
+        sql = "call `system`.row_result(cast(1.2 as decimal))";
+        verifyCallOperation(
+                sql,
+                "CALL PROCEDURE:"
+                        + " (procedureIdentifier: 
[`p1`.`system`.`row_result`],"
+                        + " inputTypes: [DECIMAL(10, 2)], outputTypes: 
[ROW<`i` INT>], arguments: [1.20])");
+        sql = "call `system`.row_result(1.2)";
+        verifyCallOperation(
+                sql,
+                "CALL PROCEDURE:"
+                        + " (procedureIdentifier: 
[`p1`.`system`.`row_result`],"
+                        + " inputTypes: [DECIMAL(10, 2)], outputTypes: 
[ROW<`i` INT>], arguments: [1.20])");
+
+        // test call the procedure with pojo as result
+        sql = "call p1.`system`.pojo_result('name', 1)";
+        verifyCallOperation(
+                sql,
+                "CALL PROCEDURE:"
+                        + " (procedureIdentifier: 
[`p1`.`system`.`pojo_result`],"
+                        + " inputTypes: [STRING, BIGINT NOT NULL],"
+                        + " outputTypes: 
[*org.apache.flink.table.planner.operations.SqlNodeToCallOperationTest$MyPojo<`name`
 STRING, `id` BIGINT NOT NULL>*],"
+                        + " arguments: [name, 1])");
+
+        // test call the procedure with timestamp as arguments
+        sql =
+                "call p1.`system`.timestamp_arg(timestamp '2023-04-22 
00:00:00.300', "
+                        + "timestamp '2023-04-22 00:00:00.300' +  INTERVAL '1' 
day ) ";
+        verifyCallOperation(
+                sql,
+                String.format(
+                        "CALL PROCEDURE:"
+                                + " (procedureIdentifier: 
[`p1`.`system`.`timestamp_arg`],"
+                                + " inputTypes: [TIMESTAMP(3), TIMESTAMP(3)], 
outputTypes: [TIMESTAMP(3)],"
+                                + " arguments: [%s, %s])",
+                        LocalDateTime.parse("2023-04-22T00:00:00.300"),
+                        LocalDateTime.parse("2023-04-23T00:00:00.300")));
+        // should throw exception when the signature doesn't match
+        assertThatThrownBy(() -> parse("call `system`.primitive_arg(1)"))
+                .hasMessageContaining(
+                        "No match found for function signature 
primitive_arg(<NUMERIC>)");
+
+        // should throw exception when the expression argument can't be reduced
+        // to literal
+        assertThatThrownBy(() -> parse("call `system`.row_result(cast((1.2 + 
2.4) as decimal))"))
+                .hasMessageContaining(
+                        "The argument at position 0 CAST(CAST(1.2 + 2.4 AS 
DECIMAL) AS DECIMAL(10, 2)) for calling procedure can't be converted to 
literal.");
+    }
+
+    private void verifyCallOperation(String sql, String expectSummary) {
+        Operation operation = parse(sql);
+        
assertThat(operation).isInstanceOf(PlannerCallProcedureOperation.class);
+        assertThat(parse(sql).asSummaryString()).isEqualTo(expectSummary);
+    }
+
+    /** A catalog with some built-in procedures for testing purpose. */
+    private static class CatalogWithBuiltInProcedure extends 
GenericInMemoryCatalog {
+
+        private static final Map<ObjectPath, Procedure> PROCEDURE_MAP = new 
HashMap<>();
+
+        static {
+            PROCEDURE_MAP.put(
+                    ObjectPath.fromString("system.primitive_arg"), new 
ProcedureWithPrimitiveArg());
+            PROCEDURE_MAP.put(
+                    ObjectPath.fromString("system.different_type_mapping"),
+                    new DifferentTypeMappingProcedure());
+            PROCEDURE_MAP.put(ObjectPath.fromString("system.var_arg"), new 
VarArgProcedure());
+            PROCEDURE_MAP.put(ObjectPath.fromString("system.row_result"), new 
RowResultProcedure());
+            PROCEDURE_MAP.put(
+                    ObjectPath.fromString("system.pojo_result"), new 
PojoResultProcedure());
+            PROCEDURE_MAP.put(
+                    ObjectPath.fromString("system.timestamp_arg"), new 
TimeStampArgProcedure());
+        }
+
+        public CatalogWithBuiltInProcedure(String name) {
+            super(name);
+        }
+
+        @Override
+        public Procedure getProcedure(ObjectPath procedurePath)
+                throws ProcedureNotExistException, CatalogException {
+            if (PROCEDURE_MAP.containsKey(procedurePath)) {
+                return PROCEDURE_MAP.get(procedurePath);
+            } else {
+                throw new ProcedureNotExistException(getName(), procedurePath);
+            }
+        }
+    }
+
+    private static class ProcedureWithPrimitiveArg implements Procedure {
+        public int[] call(ProcedureContext context, int arg1, long arg2) {
+            return null;
+        }
+    }
+
+    @ProcedureHint(input = @DataTypeHint("INT"), output = @DataTypeHint("INT"))
+    @ProcedureHint(input = @DataTypeHint("BIGINT"), output = 
@DataTypeHint("BIGINT"))
+    private static class DifferentTypeMappingProcedure implements Procedure {
+        public Number[] call(ProcedureContext procedureContext, Number n) {
+            return null;
+        }
+    }
+
+    private static class VarArgProcedure implements Procedure {
+        public String[] call(ProcedureContext procedureContext, int i, int... 
more) {
+            return null;
+        }
+    }
+
+    private static class RowResultProcedure implements Procedure {
+        public @DataTypeHint("ROW<i INT>") Row[] call(
+                ProcedureContext procedureContext,
+                @DataTypeHint("DECIMAL(10, 2)") BigDecimal decimal) {
+            return null;
+        }
+    }
+
+    private static class PojoResultProcedure implements Procedure {
+        public MyPojo[] call(ProcedureContext procedureContext, String name, 
long id) {
+            return new MyPojo[0];
+        }
+    }
+
+    private static class TimeStampArgProcedure implements Procedure {
+        public @DataTypeHint("TIMESTAMP(3)") LocalDateTime[] call(
+                ProcedureContext procedureContext,
+                @DataTypeHint("TIMESTAMP(3)") LocalDateTime localDateTime,
+                @DataTypeHint("TIMESTAMP(3)") TimestampData timestampData) {
+            return null;
+        }
+    }
+
+    /** A simple pojo class for testing purpose. */
+    public static class MyPojo {
+        private final String name;
+        private final long id;
+
+        public MyPojo(String name, long id) {
+            this.name = name;
+            this.id = id;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public long getId() {
+            return id;
+        }
+    }
+}

Reply via email to