This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7e2bc6e8d21411d5aa13200791f26edc77cda4ba
Author: luoyuxia <[email protected]>
AuthorDate: Fri Jul 14 11:25:53 2023 +0800

    [FLINK-32354][table] Supports executing call procedure statement
    This closes #22994
---
 .../src/test/resources/sql/call_procedure.q        |  69 +++++
 .../SqlGatewayStreamExecutionEnvironment.java}     |  23 +-
 .../service/operation/OperationExecutor.java       |  20 +-
 .../table/procedure/DefaultProcedureContext.java}  |  19 +-
 .../table/operations/CallProcedureOperation.java   |   2 +-
 .../operations/PlannerCallProcedureOperation.java  | 312 +++++++++++++++++++++
 .../factories/TestProcedureCatalogFactory.java     | 217 ++++++++++++++
 .../runtime/stream/sql/ProcedureITCase.java        | 105 ++++---
 .../org.apache.flink.table.factories.Factory       |   1 +
 9 files changed, 720 insertions(+), 48 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/test/resources/sql/call_procedure.q 
b/flink-table/flink-sql-client/src/test/resources/sql/call_procedure.q
new file mode 100644
index 00000000000..9e0d53eb476
--- /dev/null
+++ b/flink-table/flink-sql-client/src/test/resources/sql/call_procedure.q
@@ -0,0 +1,69 @@
+# call-procedure.q - CALL
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+create catalog test_procedure_catalog with ('type'='test_procedure_catalog');
+[INFO] Execute statement succeed.
+!info
+
+call `test_procedure_catalog`.`system`.generate_n(2);
++--------+
+| result |
++--------+
+|      0 |
+|      1 |
++--------+
+2 rows in set
+!ok
+
+call `test_procedure_catalog`.`system`.generate_n(1);
++--------+
+| result |
++--------+
+|      0 |
++--------+
+1 row in set
+!ok
+
+# switch current catalog to test_procedure_catalog
+use catalog test_procedure_catalog;
+[INFO] Execute statement succeed.
+!info
+
+# create a database `system` to avoid DatabaseNotExistException in the 
following `show procedure` statement
+create database `system`;
+[INFO] Execute statement succeed.
+!info
+
+show procedures in `system` ilike 'gEnerate%';
++----------------+
+| procedure name |
++----------------+
+|     generate_n |
+|  generate_user |
++----------------+
+2 rows in set
+!ok
+
+# test call procedure will pojo as return type
+call `system`.generate_user('yuxia', 18);
++-------+-----+
+|  name | age |
++-------+-----+
+| yuxia |  18 |
++-------+-----+
+1 row in set
+!ok
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/environment/SqlGatewayStreamExecutionEnvironment.java
similarity index 51%
copy from 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java
copy to 
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/environment/SqlGatewayStreamExecutionEnvironment.java
index b432bf73021..15b1d015916 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/environment/SqlGatewayStreamExecutionEnvironment.java
@@ -16,10 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.operations;
+package org.apache.flink.table.gateway.environment;
 
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
 
-/** A {@link Operation} that describes the call procedure statement. */
-@Internal
-public interface CallProcedureOperation extends Operation {}
+/**
+ * The SqlGatewayStreamExecutionEnvironment is a {@link 
StreamExecutionEnvironment} that runs the
+ * program with SQL gateway.
+ */
+public class SqlGatewayStreamExecutionEnvironment extends 
StreamExecutionEnvironment {
+    public static void setAsContext(ClassLoader classLoader) {
+        final StreamExecutionEnvironmentFactory factory =
+                conf -> new StreamExecutionEnvironment(conf, classLoader);
+        initializeContextEnvironment(factory);
+    }
+
+    public static void unsetAsContext() {
+        resetContextEnvironment();
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index 1be89881aec..2285460a4d2 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -62,11 +62,13 @@ import org.apache.flink.table.functions.FunctionIdentifier;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.results.FunctionInfo;
 import org.apache.flink.table.gateway.api.results.TableInfo;
+import 
org.apache.flink.table.gateway.environment.SqlGatewayStreamExecutionEnvironment;
 import org.apache.flink.table.gateway.service.context.SessionContext;
 import org.apache.flink.table.gateway.service.result.ResultFetcher;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.BeginStatementSetOperation;
+import org.apache.flink.table.operations.CallProcedureOperation;
 import org.apache.flink.table.operations.CompileAndExecutePlanOperation;
 import org.apache.flink.table.operations.DeleteFromFilterOperation;
 import org.apache.flink.table.operations.EndStatementSetOperation;
@@ -192,9 +194,21 @@ public class OperationExecutor {
                             + "multiple 'INSERT INTO' statements wrapped in a 
'STATEMENT SET' block.");
         }
         Operation op = parsedOperations.get(0);
-        return sessionContext.isStatementSetState()
-                ? executeOperationInStatementSetState(tableEnv, handle, op)
-                : executeOperation(tableEnv, handle, op);
+        if (op instanceof CallProcedureOperation) {
+            // if the operation is CallProcedureOperation, we need to set the 
stream environment
+            // context to it since the procedure will use the stream 
environment
+            try {
+                SqlGatewayStreamExecutionEnvironment.setAsContext(
+                        sessionContext.getUserClassloader());
+                return executeOperation(tableEnv, handle, op);
+            } finally {
+                SqlGatewayStreamExecutionEnvironment.unsetAsContext();
+            }
+        } else {
+            return sessionContext.isStatementSetState()
+                    ? executeOperationInStatementSetState(tableEnv, handle, op)
+                    : executeOperation(tableEnv, handle, op);
+        }
     }
 
     public String getCurrentCatalog() {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/procedure/DefaultProcedureContext.java
similarity index 59%
copy from 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java
copy to 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/procedure/DefaultProcedureContext.java
index b432bf73021..02df78733ef 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/procedure/DefaultProcedureContext.java
@@ -16,10 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.operations;
+package org.apache.flink.table.procedure;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
-/** A {@link Operation} that describes the call procedure statement. */
+/** The default implementation for {@link ProcedureContext}. */
 @Internal
-public interface CallProcedureOperation extends Operation {}
+public class DefaultProcedureContext implements ProcedureContext {
+
+    private final StreamExecutionEnvironment executionEnvironment;
+
+    public DefaultProcedureContext(StreamExecutionEnvironment 
executionEnvironment) {
+        this.executionEnvironment = executionEnvironment;
+    }
+
+    @Override
+    public StreamExecutionEnvironment getExecutionEnvironment() {
+        return executionEnvironment;
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java
index b432bf73021..fae7047f41a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java
@@ -22,4 +22,4 @@ import org.apache.flink.annotation.Internal;
 
 /** A {@link Operation} that describes the call procedure statement. */
 @Internal
-public interface CallProcedureOperation extends Operation {}
+public interface CallProcedureOperation extends ExecutableOperation {}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
index 4a315e8ccad..134d2fd2737 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
@@ -18,20 +18,62 @@
 
 package org.apache.flink.table.planner.operations;
 
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.ResultProvider;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
 import org.apache.flink.table.operations.CallProcedureOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.OperationUtils;
+import 
org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl;
+import org.apache.flink.table.procedure.DefaultProcedureContext;
+import org.apache.flink.table.procedure.ProcedureContext;
 import org.apache.flink.table.procedures.Procedure;
+import org.apache.flink.table.procedures.ProcedureDefinition;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.extraction.ExtractionUtils;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.table.utils.print.RowDataToStringConverter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.time.ZoneId;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.types.extraction.ExtractionUtils.isAssignable;
 
 /** Wrapper for valid call procedure operation generated by Planner. */
 public class PlannerCallProcedureOperation implements CallProcedureOperation {
 
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(PlannerCallProcedureOperation.class);
+
     private final ObjectIdentifier procedureIdentifier;
     private final Procedure procedure;
 
@@ -54,6 +96,171 @@ public class PlannerCallProcedureOperation implements 
CallProcedureOperation {
         this.outputType = outputType;
     }
 
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        TableConfig tableConfig = ctx.getTableConfig();
+        ClassLoader userClassLoader = 
ctx.getResourceManager().getUserClassLoader();
+
+        // get the class for the args
+        Class<?>[] argumentClz = new Class[1 + inputTypes.length];
+        argumentClz[0] = ProcedureContext.class;
+        for (int i = 0; i < inputTypes.length; i++) {
+            argumentClz[i + 1] = inputTypes[i].getConversionClass();
+        }
+
+        // get the value for the args
+        Object[] argumentVal = getConvertedArgumentValues(tableConfig, 
userClassLoader);
+
+        // call the procedure, get result
+        Object procedureResult = callProcedure(procedure, argumentClz, 
argumentVal);
+
+        return procedureResultToTableResult(procedureResult, tableConfig, 
userClassLoader);
+    }
+
+    private Object[] getConvertedArgumentValues(
+            TableConfig tableConfig, ClassLoader userClassLoader) {
+        // should be [ProcedureContext, arg1, arg2, ..]
+        Object[] argumentVal = new Object[1 + internalInputArguments.length];
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(tableConfig.getConfiguration());
+        argumentVal[0] = new DefaultProcedureContext(env);
+        for (int i = 0; i < internalInputArguments.length; i++) {
+            argumentVal[i + 1] =
+                    toExternal(internalInputArguments[i], inputTypes[i], 
userClassLoader);
+        }
+        return argumentVal;
+    }
+
+    /** Convert the value with internal representation to the value with 
external representation. */
+    private Object toExternal(Object internalValue, DataType inputType, 
ClassLoader classLoader) {
+        if (!(DataTypeUtils.isInternal(inputType))) {
+            // if the expected input type of the procedure is not internal 
type,
+            // which means the converted Flink internal value doesn't
+            // match the expected input type, then we need to convert the Flink
+            // internal value to external value
+            DataStructureConverter<Object, Object> converter =
+                    DataStructureConverters.getConverter(inputType);
+            converter.open(classLoader);
+            return converter.toExternal(internalValue);
+        } else {
+            return internalValue;
+        }
+    }
+
+    private Object callProcedure(Procedure procedure, Class<?>[] inputClz, 
Object[] inputArgs) {
+        String callMethodName = ProcedureDefinition.PROCEDURE_CALL;
+        // get the possible methods to invoke
+        final List<Method> methods =
+                ExtractionUtils.collectMethods(procedure.getClass(), 
callMethodName);
+        List<Method> callMethods =
+                methods.stream()
+                        .filter(
+                                method ->
+                                        ExtractionUtils.isInvokable(method, 
inputClz)
+                                                && 
method.getReturnType().isArray()
+                                                && isAssignable(
+                                                        
outputType.getConversionClass(),
+                                                        
method.getReturnType().getComponentType(),
+                                                        true))
+                        .collect(Collectors.toList());
+        if (callMethods.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Could not find an implementation method '%s' in 
class '%s' for procedure '%s' that "
+                                    + "matches the following signature:\n%s",
+                            callMethodName,
+                            procedure.getClass().getName(),
+                            procedureIdentifier,
+                            ExtractionUtils.createMethodSignatureString(
+                                    callMethodName, inputClz, 
outputType.getConversionClass())));
+        }
+        if (callMethods.size() > 1) {
+            LOGGER.warn(
+                    "There are multiple methods matching the procedure 
calling: {}. "
+                            + " Only invoke the first method: {}.",
+                    methods,
+                    methods.get(0));
+        }
+        return invokeCallMethod(procedure, callMethods.get(0), inputArgs);
+    }
+
+    private Object invokeCallMethod(Procedure procedure, Method calMethod, 
Object[] inputArgs) {
+        try {
+            if (calMethod.isVarArgs()) {
+                // if the method is var args, we need to adjust the inputArgs 
to make
+                // it match the signature, the logic of which is as follows:
+                // assuming the method is varMethod(arg1, arg2, ...)
+                // the args to involve this method is arg1, arg2, arg3, arg4
+                // first, we get the index of the vararg, which is 2 in this 
case
+                // then, we compose the args after the index to a array, 
[arg3, arg4],
+                // finally, we get the right argument to call this var-args 
method,
+                // that's arg1, arg2, [arg3, arg4]
+
+                final int paramCount = calMethod.getParameterCount();
+                final int varargsIndex = paramCount - 1;
+                Object[] newInputArgs = new Object[paramCount];
+                System.arraycopy(inputArgs, 0, newInputArgs, 0, varargsIndex);
+                // handle the remaining values in the input args
+                // get the class type for the varargs
+                Class<?> varargsElementType =
+                        
calMethod.getParameterTypes()[varargsIndex].getComponentType();
+                int varargsLength = inputArgs.length - varargsIndex;
+                Object varargs = Array.newInstance(varargsElementType, 
varargsLength);
+                System.arraycopy(inputArgs, varargsIndex, varargs, 0, 
varargsLength);
+                newInputArgs[varargsIndex] = varargs;
+                inputArgs = newInputArgs;
+            }
+            LOGGER.info("Invoke method {} with arguments: {}.", calMethod, 
inputArgs);
+            return calMethod.invoke(procedure, inputArgs);
+        } catch (IllegalAccessException e) {
+            throw new TableException(
+                    String.format(
+                            "Access to the method %s was denied: %s.",
+                            ProcedureDefinition.PROCEDURE_CALL, 
e.getMessage()),
+                    e);
+        } catch (InvocationTargetException e) {
+            throw new TableException(
+                    String.format(
+                            "Can't involve the method %s.", 
ProcedureDefinition.PROCEDURE_CALL),
+                    e);
+        }
+    }
+
+    /** Convert the result of procedure to table result . */
+    private TableResultInternal procedureResultToTableResult(
+            Object procedureResult, TableConfig tableConfig, ClassLoader 
userClassLoader) {
+        // get result converter
+        ZoneId zoneId = tableConfig.getLocalTimeZone();
+        DataType tableResultType = outputType;
+        // if is not composite type, wrap it to composited type
+        if (!LogicalTypeChecks.isCompositeType(outputType.getLogicalType())) {
+            tableResultType = DataTypes.ROW(DataTypes.FIELD("result", 
tableResultType));
+        }
+
+        // expand the result type to schema
+        ResolvedSchema resultSchema = 
DataTypeUtils.expandCompositeTypeToSchema(tableResultType);
+        RowDataToStringConverter rowDataToStringConverter =
+                new RowDataToStringConverterImpl(
+                        tableResultType,
+                        zoneId,
+                        userClassLoader,
+                        tableConfig
+                                
.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR)
+                                .isEnabled());
+        // create DataStructure converters
+        DataStructureConverter<Object, Object> converter =
+                DataStructureConverters.getConverter(outputType);
+        converter.open(userClassLoader);
+
+        return TableResultImpl.builder()
+                .resultProvider(
+                        new CallProcedureResultProvider(
+                                converter, rowDataToStringConverter, 
procedureResult))
+                .schema(resultSchema)
+                .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+                .build();
+    }
+
     @Override
     public String asSummaryString() {
         Map<String, Object> params = new LinkedHashMap<>();
@@ -64,4 +271,109 @@ public class PlannerCallProcedureOperation implements 
CallProcedureOperation {
         return OperationUtils.formatWithChildren(
                 "CALL PROCEDURE", params, Collections.emptyList(), 
Operation::asSummaryString);
     }
+
+    /** A result provider for the result of calling procedure. */
+    static final class CallProcedureResultProvider implements ResultProvider {
+
+        private final DataStructureConverter<Object, Object> converter;
+        private final RowDataToStringConverter toStringConverter;
+        private final Object[] result;
+
+        public CallProcedureResultProvider(
+                DataStructureConverter<Object, Object> converter,
+                RowDataToStringConverter toStringConverter,
+                Object result) {
+            this.converter = converter;
+            this.toStringConverter = toStringConverter;
+            this.result = toResultArray(result);
+        }
+
+        @Override
+        public ResultProvider setJobClient(JobClient jobClient) {
+            return this;
+        }
+
+        @Override
+        public CloseableIterator<RowData> toInternalIterator() {
+            Iterator<Object> objectIterator = Arrays.stream(result).iterator();
+
+            return new CloseableIterator<RowData>() {
+                @Override
+                public boolean hasNext() {
+                    return objectIterator.hasNext();
+                }
+
+                @Override
+                public RowData next() {
+                    Object element = 
converter.toInternalOrNull(objectIterator.next());
+                    if (!(element instanceof RowData)) {
+                        return GenericRowData.of(element);
+                    }
+                    return (RowData) element;
+                }
+
+                @Override
+                public void close() {}
+            };
+        }
+
+        @Override
+        public CloseableIterator<Row> toExternalIterator() {
+            Iterator<Object> objectIterator = Arrays.stream(result).iterator();
+
+            return new CloseableIterator<Row>() {
+                @Override
+                public boolean hasNext() {
+                    return objectIterator.hasNext();
+                }
+
+                @Override
+                public Row next() {
+                    Object element = objectIterator.next();
+                    if (!(element instanceof Row)) {
+                        return Row.of(element);
+                    }
+                    return (Row) element;
+                }
+
+                @Override
+                public void close() {}
+            };
+        }
+
+        @Override
+        public RowDataToStringConverter getRowDataStringConverter() {
+            return toStringConverter;
+        }
+
+        @Override
+        public boolean isFirstRowReady() {
+            // always return true
+            return true;
+        }
+
+        private Object[] toResultArray(Object result) {
+            // the result may be primitive array,
+            // convert it to primitive wrapper array
+            if (isPrimitiveArray(result)) {
+                return toPrimitiveWrapperArray(result);
+            }
+            return (Object[]) result;
+        }
+
+        private boolean isPrimitiveArray(Object result) {
+            return result.getClass().isArray()
+                    && result.getClass().getComponentType().isPrimitive();
+        }
+
+        private Object[] toPrimitiveWrapperArray(Object primitiveArray) {
+            int length = Array.getLength(primitiveArray);
+            Object[] objArray = new Object[length];
+
+            for (int i = 0; i < length; i++) {
+                objArray[i] = Array.get(primitiveArray, i);
+            }
+            return objArray;
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
new file mode 100644
index 00000000000..11317d5c717
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.table.procedures.Procedure;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** A factory to create a catalog with some built-in procedures for testing 
purpose. */
+public class TestProcedureCatalogFactory implements CatalogFactory {
+    private static final String IDENTIFIER = "test_procedure_catalog";
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Catalog createCatalog(Context context) {
+        final FactoryUtil.CatalogFactoryHelper helper =
+                FactoryUtil.createCatalogFactoryHelper(this, context);
+        helper.validate();
+        return new CatalogWithBuiltInProcedure(context.getName());
+    }
+
+    /** A catalog with some built-in procedures for testing purpose. */
+    public static class CatalogWithBuiltInProcedure extends 
GenericInMemoryCatalog {
+
+        private static final Map<ObjectPath, Procedure> PROCEDURE_MAP = new 
HashMap<>();
+
+        static {
+            PROCEDURE_MAP.put(
+                    ObjectPath.fromString("system.generate_n"), new 
GenerateSequenceProcedure());
+            PROCEDURE_MAP.put(ObjectPath.fromString("system.sum_n"), new 
SumProcedure());
+            PROCEDURE_MAP.put(ObjectPath.fromString("system.get_year"), new 
GetYearProcedure());
+            PROCEDURE_MAP.put(
+                    ObjectPath.fromString("system.generate_user"), new 
GenerateUserProcedure());
+        }
+
+        public CatalogWithBuiltInProcedure(String name) {
+            super(name);
+        }
+
+        @Override
+        public List<String> listProcedures(String dbName)
+                throws DatabaseNotExistException, CatalogException {
+            if (!databaseExists(dbName)) {
+                throw new DatabaseNotExistException(getName(), dbName);
+            }
+            return PROCEDURE_MAP.keySet().stream()
+                    .filter(procedurePath -> 
procedurePath.getDatabaseName().equals(dbName))
+                    .map(ObjectPath::getObjectName)
+                    .collect(Collectors.toList());
+        }
+
+        @Override
+        public Procedure getProcedure(ObjectPath procedurePath)
+                throws ProcedureNotExistException, CatalogException {
+            if (PROCEDURE_MAP.containsKey(procedurePath)) {
+                return PROCEDURE_MAP.get(procedurePath);
+            } else {
+                throw new ProcedureNotExistException(getName(), procedurePath);
+            }
+        }
+    }
+
+    /** A procedure to a sequence from 0 to n for testing purpose. */
+    public static class GenerateSequenceProcedure implements Procedure {
+        public long[] call(ProcedureContext procedureContext, int n) throws 
Exception {
+            return generate(procedureContext.getExecutionEnvironment(), n);
+        }
+
+        public long[] call(ProcedureContext procedureContext, int n, String 
runTimeMode)
+                throws Exception {
+            StreamExecutionEnvironment env = 
procedureContext.getExecutionEnvironment();
+            env.setRuntimeMode(RuntimeExecutionMode.valueOf(runTimeMode));
+            return generate(env, n);
+        }
+
+        private long[] generate(StreamExecutionEnvironment env, int n) throws 
Exception {
+            env.setParallelism(1);
+            long[] sequenceN = new long[n];
+            int i = 0;
+            try (CloseableIterator<Long> result = env.fromSequence(0, n - 
1).executeAndCollect()) {
+                while (result.hasNext()) {
+                    sequenceN[i++] = result.next();
+                }
+            }
+            return sequenceN;
+        }
+    }
+
+    /** A procedure to sum decimal values for testing purpose. */
+    public static class SumProcedure implements Procedure {
+        public @DataTypeHint("ROW< sum_value decimal(10, 2), count INT >") 
Row[] call(
+                ProcedureContext procedureContext,
+                @DataTypeHint("DECIMAL(10, 2)") BigDecimal... inputs) {
+            if (inputs.length == 0) {
+                return new Row[] {Row.of(null, 0)};
+            }
+            int counts = inputs.length;
+            BigDecimal result = inputs[0];
+            for (int i = 1; i < inputs.length; i++) {
+                result = result.add(inputs[i]);
+            }
+            return new Row[] {Row.of(result, counts)};
+        }
+    }
+
+    /** A procedure to get year from the passed timestamp parameter for 
testing purpose. */
+    public static class GetYearProcedure implements Procedure {
+        public String[] call(ProcedureContext procedureContext, 
LocalDateTime... timestamps) {
+            String[] results = new String[timestamps.length];
+            for (int i = 0; i < results.length; i++) {
+                results[i] = String.valueOf(timestamps[i].getYear());
+            }
+            return results;
+        }
+    }
+
+    /** A procedure to generate a user according to the passed parameters for 
testing purpose. */
+    public static class GenerateUserProcedure implements Procedure {
+        public UserPojo[] call(ProcedureContext procedureContext, String name, 
Integer age) {
+            return new UserPojo[] {new UserPojo(name, age)};
+        }
+    }
+
+    /** A simple pojo class for testing purpose. */
+    public static class UserPojo {
+        private final String name;
+        private final int age;
+
+        public UserPojo(String name, int age) {
+            this.name = name;
+            this.age = age;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public int getAge() {
+            return age;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            UserPojo userPojo = (UserPojo) o;
+            return age == userPojo.age && Objects.equals(name, userPojo.name);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(name, age);
+        }
+
+        @Override
+        public String toString() {
+            return "UserPojo{" + "name='" + name + '\'' + ", age=" + age + '}';
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
index 79c33631744..efb6121065b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
@@ -18,11 +18,15 @@
 
 package org.apache.flink.table.planner.runtime.stream.sql;
 
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.catalog.CatalogDatabaseImpl;
-import org.apache.flink.table.catalog.GenericInMemoryCatalog;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.planner.factories.TestProcedureCatalogFactory;
 import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
@@ -40,14 +44,12 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 /** IT Case for statements related to procedure. */
 public class ProcedureITCase extends StreamingTestBase {
 
-    private static final String SYSTEM_DATABASE_NAME = "system";
-
     @BeforeEach
     @Override
     public void before() throws Exception {
         super.before();
-        CatalogWithBuildInProcedure procedureCatalog =
-                new CatalogWithBuildInProcedure("procedure_catalog");
+        TestProcedureCatalogFactory.CatalogWithBuiltInProcedure 
procedureCatalog =
+                new 
TestProcedureCatalogFactory.CatalogWithBuiltInProcedure("procedure_catalog");
         procedureCatalog.createDatabase(
                 "system", new CatalogDatabaseImpl(Collections.emptyMap(), 
null), true);
         tEnv().registerCatalog("test_p", procedureCatalog);
@@ -83,8 +85,7 @@ public class ProcedureITCase extends StreamingTestBase {
                 CollectionUtil.iteratorToList(
                         tEnv().executeSql("show procedures in 
`system`").collect());
         assertThat(rows.toString())
-                .isEqualTo(
-                        "[+I[generate_n], +I[generate_user], +I[get_year], 
+I[miss_procedure_context], +I[sum_n]]");
+                .isEqualTo("[+I[generate_n], +I[generate_user], +I[get_year], 
+I[sum_n]]");
 
         // show procedure with like
         rows =
@@ -110,40 +111,72 @@ public class ProcedureITCase extends StreamingTestBase {
                 CollectionUtil.iteratorToList(
                         tEnv().executeSql("show procedures in `system` not 
like 'generate%'")
                                 .collect());
-        assertThat(rows.toString())
-                .isEqualTo("[+I[get_year], +I[miss_procedure_context], 
+I[sum_n]]");
+        assertThat(rows.toString()).isEqualTo("[+I[get_year], +I[sum_n]]");
 
         // show procedure with not ilike
         rows =
                 CollectionUtil.iteratorToList(
                         tEnv().executeSql("show procedures in `system` not 
ilike 'generaTe%'")
                                 .collect());
-        assertThat(rows.toString())
-                .isEqualTo("[+I[get_year], +I[miss_procedure_context], 
+I[sum_n]]");
+        assertThat(rows.toString()).isEqualTo("[+I[get_year], +I[sum_n]]");
+    }
+
+    @Test
+    void testCallProcedure() {
+        // test call procedure can run a flink job
+        TableResult tableResult = tEnv().executeSql("call 
`system`.generate_n(4)");
+        verifyTableResult(
+                tableResult,
+                Arrays.asList(Row.of(0), Row.of(1), Row.of(2), Row.of(3)),
+                ResolvedSchema.of(
+                        Column.physical(
+                                "result", 
DataTypes.BIGINT().notNull().bridgedTo(long.class))));
+
+        // call a procedure which will run in batch mode
+        tableResult = tEnv().executeSql("call `system`.generate_n(4, 
'BATCH')");
+        verifyTableResult(
+                tableResult,
+                Arrays.asList(Row.of(0), Row.of(1), Row.of(2), Row.of(3)),
+                ResolvedSchema.of(
+                        Column.physical(
+                                "result", 
DataTypes.BIGINT().notNull().bridgedTo(long.class))));
+        // check the runtime mode in current env is still streaming
+        assertThat(tEnv().getConfig().get(ExecutionOptions.RUNTIME_MODE))
+                .isEqualTo(RuntimeExecutionMode.STREAMING);
+
+        // test call procedure with var-args as well as output data type hint
+        tableResult = tEnv().executeSql("call `system`.sum_n(5.5, 1.2, 3.3)");
+        verifyTableResult(
+                tableResult,
+                Collections.singletonList(Row.of("10.00", 3)),
+                ResolvedSchema.of(
+                        Column.physical("sum_value", DataTypes.DECIMAL(10, 2)),
+                        Column.physical("count", DataTypes.INT())));
+
+        // test call procedure with timestamp as input
+        tableResult =
+                tEnv().executeSql(
+                                "call `system`.get_year(timestamp '2023-04-22 
00:00:00', timestamp '2024-04-22 00:00:00.300')");
+        verifyTableResult(
+                tableResult,
+                Arrays.asList(Row.of(2023), Row.of(2024)),
+                ResolvedSchema.of(Column.physical("result", 
DataTypes.STRING())));
+
+        // test call procedure with pojo as return type
+        tableResult = tEnv().executeSql("call `system`.generate_user('yuxia', 
18)");
+        verifyTableResult(
+                tableResult,
+                Collections.singletonList(
+                        Row.of(new 
TestProcedureCatalogFactory.UserPojo("yuxia", 18))),
+                ResolvedSchema.of(
+                        Column.physical("name", DataTypes.STRING()),
+                        Column.physical("age", 
DataTypes.INT().notNull().bridgedTo(int.class))));
     }
 
-    /** A catalog with some built-in procedures for test purpose. */
-    private static class CatalogWithBuildInProcedure extends 
GenericInMemoryCatalog {
-        public CatalogWithBuildInProcedure(String name) {
-            super(name);
-        }
-
-        @Override
-        public List<String> listProcedures(String dbName)
-                throws DatabaseNotExistException, CatalogException {
-            if (!databaseExists(dbName)) {
-                throw new DatabaseNotExistException(getName(), dbName);
-            }
-            if (dbName.equals(SYSTEM_DATABASE_NAME)) {
-                return Arrays.asList(
-                        "generate_n",
-                        "sum_n",
-                        "get_year",
-                        "generate_user",
-                        "miss_procedure_context");
-            } else {
-                return Collections.emptyList();
-            }
-        }
+    private void verifyTableResult(
+            TableResult tableResult, List<Row> expectedResult, ResolvedSchema 
expectedSchema) {
+        
assertThat(CollectionUtil.iteratorToList(tableResult.collect()).toString())
+                .isEqualTo(expectedResult.toString());
+        assertThat(tableResult.getResolvedSchema()).isEqualTo(expectedSchema);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 4d77f4057ac..2444832afb0 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -20,3 +20,4 @@ 
org.apache.flink.table.planner.factories.TableFactoryHarness$Factory
 org.apache.flink.table.planner.plan.stream.sql.TestTableFactory
 org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory
 org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory
+org.apache.flink.table.planner.factories.TestProcedureCatalogFactory


Reply via email to