This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7e2bc6e8d21411d5aa13200791f26edc77cda4ba Author: luoyuxia <[email protected]> AuthorDate: Fri Jul 14 11:25:53 2023 +0800 [FLINK-32354][table] Supports executing call procedure statement This closes #22994 --- .../src/test/resources/sql/call_procedure.q | 69 +++++ .../SqlGatewayStreamExecutionEnvironment.java} | 23 +- .../service/operation/OperationExecutor.java | 20 +- .../table/procedure/DefaultProcedureContext.java} | 19 +- .../table/operations/CallProcedureOperation.java | 2 +- .../operations/PlannerCallProcedureOperation.java | 312 +++++++++++++++++++++ .../factories/TestProcedureCatalogFactory.java | 217 ++++++++++++++ .../runtime/stream/sql/ProcedureITCase.java | 105 ++++--- .../org.apache.flink.table.factories.Factory | 1 + 9 files changed, 720 insertions(+), 48 deletions(-) diff --git a/flink-table/flink-sql-client/src/test/resources/sql/call_procedure.q b/flink-table/flink-sql-client/src/test/resources/sql/call_procedure.q new file mode 100644 index 00000000000..9e0d53eb476 --- /dev/null +++ b/flink-table/flink-sql-client/src/test/resources/sql/call_procedure.q @@ -0,0 +1,69 @@ +# call-procedure.q - CALL +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to you under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +create catalog test_procedure_catalog with ('type'='test_procedure_catalog'); +[INFO] Execute statement succeed. +!info + +call `test_procedure_catalog`.`system`.generate_n(2); ++--------+ +| result | ++--------+ +| 0 | +| 1 | ++--------+ +2 rows in set +!ok + +call `test_procedure_catalog`.`system`.generate_n(1); ++--------+ +| result | ++--------+ +| 0 | ++--------+ +1 row in set +!ok + +# switch current catalog to test_procedure_catalog +use catalog test_procedure_catalog; +[INFO] Execute statement succeed. +!info + +# create a database `system` to avoid DatabaseNotExistException in the following `show procedure` statement +create database `system`; +[INFO] Execute statement succeed. +!info + +show procedures in `system` ilike 'gEnerate%'; ++----------------+ +| procedure name | ++----------------+ +| generate_n | +| generate_user | ++----------------+ +2 rows in set +!ok + +# test call procedure will pojo as return type +call `system`.generate_user('yuxia', 18); ++-------+-----+ +| name | age | ++-------+-----+ +| yuxia | 18 | ++-------+-----+ +1 row in set +!ok diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/environment/SqlGatewayStreamExecutionEnvironment.java similarity index 51% copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java copy to flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/environment/SqlGatewayStreamExecutionEnvironment.java index b432bf73021..15b1d015916 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/environment/SqlGatewayStreamExecutionEnvironment.java @@ -16,10 +16,23 @@ * limitations under the License. */ -package org.apache.flink.table.operations; +package org.apache.flink.table.gateway.environment; -import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; -/** A {@link Operation} that describes the call procedure statement. */ -@Internal -public interface CallProcedureOperation extends Operation {} +/** + * The SqlGatewayStreamExecutionEnvironment is a {@link StreamExecutionEnvironment} that runs the + * program with SQL gateway. + */ +public class SqlGatewayStreamExecutionEnvironment extends StreamExecutionEnvironment { + public static void setAsContext(ClassLoader classLoader) { + final StreamExecutionEnvironmentFactory factory = + conf -> new StreamExecutionEnvironment(conf, classLoader); + initializeContextEnvironment(factory); + } + + public static void unsetAsContext() { + resetContextEnvironment(); + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java index 1be89881aec..2285460a4d2 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java @@ -62,11 +62,13 @@ import org.apache.flink.table.functions.FunctionIdentifier; import org.apache.flink.table.gateway.api.operation.OperationHandle; import org.apache.flink.table.gateway.api.results.FunctionInfo; import org.apache.flink.table.gateway.api.results.TableInfo; +import org.apache.flink.table.gateway.environment.SqlGatewayStreamExecutionEnvironment; import org.apache.flink.table.gateway.service.context.SessionContext; import org.apache.flink.table.gateway.service.result.ResultFetcher; import org.apache.flink.table.gateway.service.utils.SqlExecutionException; import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.operations.BeginStatementSetOperation; +import org.apache.flink.table.operations.CallProcedureOperation; import org.apache.flink.table.operations.CompileAndExecutePlanOperation; import org.apache.flink.table.operations.DeleteFromFilterOperation; import org.apache.flink.table.operations.EndStatementSetOperation; @@ -192,9 +194,21 @@ public class OperationExecutor { + "multiple 'INSERT INTO' statements wrapped in a 'STATEMENT SET' block."); } Operation op = parsedOperations.get(0); - return sessionContext.isStatementSetState() - ? executeOperationInStatementSetState(tableEnv, handle, op) - : executeOperation(tableEnv, handle, op); + if (op instanceof CallProcedureOperation) { + // if the operation is CallProcedureOperation, we need to set the stream environment + // context to it since the procedure will use the stream environment + try { + SqlGatewayStreamExecutionEnvironment.setAsContext( + sessionContext.getUserClassloader()); + return executeOperation(tableEnv, handle, op); + } finally { + SqlGatewayStreamExecutionEnvironment.unsetAsContext(); + } + } else { + return sessionContext.isStatementSetState() + ? executeOperationInStatementSetState(tableEnv, handle, op) + : executeOperation(tableEnv, handle, op); + } } public String getCurrentCatalog() { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/procedure/DefaultProcedureContext.java similarity index 59% copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java copy to flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/procedure/DefaultProcedureContext.java index b432bf73021..02df78733ef 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/procedure/DefaultProcedureContext.java @@ -16,10 +16,23 @@ * limitations under the License. */ -package org.apache.flink.table.operations; +package org.apache.flink.table.procedure; import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -/** A {@link Operation} that describes the call procedure statement. */ +/** The default implementation for {@link ProcedureContext}. */ @Internal -public interface CallProcedureOperation extends Operation {} +public class DefaultProcedureContext implements ProcedureContext { + + private final StreamExecutionEnvironment executionEnvironment; + + public DefaultProcedureContext(StreamExecutionEnvironment executionEnvironment) { + this.executionEnvironment = executionEnvironment; + } + + @Override + public StreamExecutionEnvironment getExecutionEnvironment() { + return executionEnvironment; + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java index b432bf73021..fae7047f41a 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CallProcedureOperation.java @@ -22,4 +22,4 @@ import org.apache.flink.annotation.Internal; /** A {@link Operation} that describes the call procedure statement. */ @Internal -public interface CallProcedureOperation extends Operation {} +public interface CallProcedureOperation extends ExecutableOperation {} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java index 4a315e8ccad..134d2fd2737 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java @@ -18,20 +18,62 @@ package org.apache.flink.table.planner.operations; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ResultKind; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.internal.ResultProvider; +import org.apache.flink.table.api.internal.TableResultImpl; +import org.apache.flink.table.api.internal.TableResultInternal; import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.conversion.DataStructureConverters; import org.apache.flink.table.operations.CallProcedureOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.OperationUtils; +import org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl; +import org.apache.flink.table.procedure.DefaultProcedureContext; +import org.apache.flink.table.procedure.ProcedureContext; import org.apache.flink.table.procedures.Procedure; +import org.apache.flink.table.procedures.ProcedureDefinition; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.extraction.ExtractionUtils; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.table.utils.print.RowDataToStringConverter; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Array; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.time.ZoneId; +import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.table.types.extraction.ExtractionUtils.isAssignable; /** Wrapper for valid call procedure operation generated by Planner. */ public class PlannerCallProcedureOperation implements CallProcedureOperation { + private static final Logger LOGGER = + LoggerFactory.getLogger(PlannerCallProcedureOperation.class); + private final ObjectIdentifier procedureIdentifier; private final Procedure procedure; @@ -54,6 +96,171 @@ public class PlannerCallProcedureOperation implements CallProcedureOperation { this.outputType = outputType; } + @Override + public TableResultInternal execute(Context ctx) { + TableConfig tableConfig = ctx.getTableConfig(); + ClassLoader userClassLoader = ctx.getResourceManager().getUserClassLoader(); + + // get the class for the args + Class<?>[] argumentClz = new Class[1 + inputTypes.length]; + argumentClz[0] = ProcedureContext.class; + for (int i = 0; i < inputTypes.length; i++) { + argumentClz[i + 1] = inputTypes[i].getConversionClass(); + } + + // get the value for the args + Object[] argumentVal = getConvertedArgumentValues(tableConfig, userClassLoader); + + // call the procedure, get result + Object procedureResult = callProcedure(procedure, argumentClz, argumentVal); + + return procedureResultToTableResult(procedureResult, tableConfig, userClassLoader); + } + + private Object[] getConvertedArgumentValues( + TableConfig tableConfig, ClassLoader userClassLoader) { + // should be [ProcedureContext, arg1, arg2, ..] + Object[] argumentVal = new Object[1 + internalInputArguments.length]; + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(tableConfig.getConfiguration()); + argumentVal[0] = new DefaultProcedureContext(env); + for (int i = 0; i < internalInputArguments.length; i++) { + argumentVal[i + 1] = + toExternal(internalInputArguments[i], inputTypes[i], userClassLoader); + } + return argumentVal; + } + + /** Convert the value with internal representation to the value with external representation. */ + private Object toExternal(Object internalValue, DataType inputType, ClassLoader classLoader) { + if (!(DataTypeUtils.isInternal(inputType))) { + // if the expected input type of the procedure is not internal type, + // which means the converted Flink internal value doesn't + // match the expected input type, then we need to convert the Flink + // internal value to external value + DataStructureConverter<Object, Object> converter = + DataStructureConverters.getConverter(inputType); + converter.open(classLoader); + return converter.toExternal(internalValue); + } else { + return internalValue; + } + } + + private Object callProcedure(Procedure procedure, Class<?>[] inputClz, Object[] inputArgs) { + String callMethodName = ProcedureDefinition.PROCEDURE_CALL; + // get the possible methods to invoke + final List<Method> methods = + ExtractionUtils.collectMethods(procedure.getClass(), callMethodName); + List<Method> callMethods = + methods.stream() + .filter( + method -> + ExtractionUtils.isInvokable(method, inputClz) + && method.getReturnType().isArray() + && isAssignable( + outputType.getConversionClass(), + method.getReturnType().getComponentType(), + true)) + .collect(Collectors.toList()); + if (callMethods.isEmpty()) { + throw new ValidationException( + String.format( + "Could not find an implementation method '%s' in class '%s' for procedure '%s' that " + + "matches the following signature:\n%s", + callMethodName, + procedure.getClass().getName(), + procedureIdentifier, + ExtractionUtils.createMethodSignatureString( + callMethodName, inputClz, outputType.getConversionClass()))); + } + if (callMethods.size() > 1) { + LOGGER.warn( + "There are multiple methods matching the procedure calling: {}. " + + " Only invoke the first method: {}.", + methods, + methods.get(0)); + } + return invokeCallMethod(procedure, callMethods.get(0), inputArgs); + } + + private Object invokeCallMethod(Procedure procedure, Method calMethod, Object[] inputArgs) { + try { + if (calMethod.isVarArgs()) { + // if the method is var args, we need to adjust the inputArgs to make + // it match the signature, the logic of which is as follows: + // assuming the method is varMethod(arg1, arg2, ...) + // the args to involve this method is arg1, arg2, arg3, arg4 + // first, we get the index of the vararg, which is 2 in this case + // then, we compose the args after the index to a array, [arg3, arg4], + // finally, we get the right argument to call this var-args method, + // that's arg1, arg2, [arg3, arg4] + + final int paramCount = calMethod.getParameterCount(); + final int varargsIndex = paramCount - 1; + Object[] newInputArgs = new Object[paramCount]; + System.arraycopy(inputArgs, 0, newInputArgs, 0, varargsIndex); + // handle the remaining values in the input args + // get the class type for the varargs + Class<?> varargsElementType = + calMethod.getParameterTypes()[varargsIndex].getComponentType(); + int varargsLength = inputArgs.length - varargsIndex; + Object varargs = Array.newInstance(varargsElementType, varargsLength); + System.arraycopy(inputArgs, varargsIndex, varargs, 0, varargsLength); + newInputArgs[varargsIndex] = varargs; + inputArgs = newInputArgs; + } + LOGGER.info("Invoke method {} with arguments: {}.", calMethod, inputArgs); + return calMethod.invoke(procedure, inputArgs); + } catch (IllegalAccessException e) { + throw new TableException( + String.format( + "Access to the method %s was denied: %s.", + ProcedureDefinition.PROCEDURE_CALL, e.getMessage()), + e); + } catch (InvocationTargetException e) { + throw new TableException( + String.format( + "Can't involve the method %s.", ProcedureDefinition.PROCEDURE_CALL), + e); + } + } + + /** Convert the result of procedure to table result . */ + private TableResultInternal procedureResultToTableResult( + Object procedureResult, TableConfig tableConfig, ClassLoader userClassLoader) { + // get result converter + ZoneId zoneId = tableConfig.getLocalTimeZone(); + DataType tableResultType = outputType; + // if is not composite type, wrap it to composited type + if (!LogicalTypeChecks.isCompositeType(outputType.getLogicalType())) { + tableResultType = DataTypes.ROW(DataTypes.FIELD("result", tableResultType)); + } + + // expand the result type to schema + ResolvedSchema resultSchema = DataTypeUtils.expandCompositeTypeToSchema(tableResultType); + RowDataToStringConverter rowDataToStringConverter = + new RowDataToStringConverterImpl( + tableResultType, + zoneId, + userClassLoader, + tableConfig + .get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR) + .isEnabled()); + // create DataStructure converters + DataStructureConverter<Object, Object> converter = + DataStructureConverters.getConverter(outputType); + converter.open(userClassLoader); + + return TableResultImpl.builder() + .resultProvider( + new CallProcedureResultProvider( + converter, rowDataToStringConverter, procedureResult)) + .schema(resultSchema) + .resultKind(ResultKind.SUCCESS_WITH_CONTENT) + .build(); + } + @Override public String asSummaryString() { Map<String, Object> params = new LinkedHashMap<>(); @@ -64,4 +271,109 @@ public class PlannerCallProcedureOperation implements CallProcedureOperation { return OperationUtils.formatWithChildren( "CALL PROCEDURE", params, Collections.emptyList(), Operation::asSummaryString); } + + /** A result provider for the result of calling procedure. */ + static final class CallProcedureResultProvider implements ResultProvider { + + private final DataStructureConverter<Object, Object> converter; + private final RowDataToStringConverter toStringConverter; + private final Object[] result; + + public CallProcedureResultProvider( + DataStructureConverter<Object, Object> converter, + RowDataToStringConverter toStringConverter, + Object result) { + this.converter = converter; + this.toStringConverter = toStringConverter; + this.result = toResultArray(result); + } + + @Override + public ResultProvider setJobClient(JobClient jobClient) { + return this; + } + + @Override + public CloseableIterator<RowData> toInternalIterator() { + Iterator<Object> objectIterator = Arrays.stream(result).iterator(); + + return new CloseableIterator<RowData>() { + @Override + public boolean hasNext() { + return objectIterator.hasNext(); + } + + @Override + public RowData next() { + Object element = converter.toInternalOrNull(objectIterator.next()); + if (!(element instanceof RowData)) { + return GenericRowData.of(element); + } + return (RowData) element; + } + + @Override + public void close() {} + }; + } + + @Override + public CloseableIterator<Row> toExternalIterator() { + Iterator<Object> objectIterator = Arrays.stream(result).iterator(); + + return new CloseableIterator<Row>() { + @Override + public boolean hasNext() { + return objectIterator.hasNext(); + } + + @Override + public Row next() { + Object element = objectIterator.next(); + if (!(element instanceof Row)) { + return Row.of(element); + } + return (Row) element; + } + + @Override + public void close() {} + }; + } + + @Override + public RowDataToStringConverter getRowDataStringConverter() { + return toStringConverter; + } + + @Override + public boolean isFirstRowReady() { + // always return true + return true; + } + + private Object[] toResultArray(Object result) { + // the result may be primitive array, + // convert it to primitive wrapper array + if (isPrimitiveArray(result)) { + return toPrimitiveWrapperArray(result); + } + return (Object[]) result; + } + + private boolean isPrimitiveArray(Object result) { + return result.getClass().isArray() + && result.getClass().getComponentType().isPrimitive(); + } + + private Object[] toPrimitiveWrapperArray(Object primitiveArray) { + int length = Array.getLength(primitiveArray); + Object[] objArray = new Object[length]; + + for (int i = 0; i < length; i++) { + objArray[i] = Array.get(primitiveArray, i); + } + return objArray; + } + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java new file mode 100644 index 00000000000..11317d5c717 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.factories; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.procedure.ProcedureContext; +import org.apache.flink.table.procedures.Procedure; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; + +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +/** A factory to create a catalog with some built-in procedures for testing purpose. */ +public class TestProcedureCatalogFactory implements CatalogFactory { + private static final String IDENTIFIER = "test_procedure_catalog"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + return Collections.emptySet(); + } + + @Override + public Catalog createCatalog(Context context) { + final FactoryUtil.CatalogFactoryHelper helper = + FactoryUtil.createCatalogFactoryHelper(this, context); + helper.validate(); + return new CatalogWithBuiltInProcedure(context.getName()); + } + + /** A catalog with some built-in procedures for testing purpose. */ + public static class CatalogWithBuiltInProcedure extends GenericInMemoryCatalog { + + private static final Map<ObjectPath, Procedure> PROCEDURE_MAP = new HashMap<>(); + + static { + PROCEDURE_MAP.put( + ObjectPath.fromString("system.generate_n"), new GenerateSequenceProcedure()); + PROCEDURE_MAP.put(ObjectPath.fromString("system.sum_n"), new SumProcedure()); + PROCEDURE_MAP.put(ObjectPath.fromString("system.get_year"), new GetYearProcedure()); + PROCEDURE_MAP.put( + ObjectPath.fromString("system.generate_user"), new GenerateUserProcedure()); + } + + public CatalogWithBuiltInProcedure(String name) { + super(name); + } + + @Override + public List<String> listProcedures(String dbName) + throws DatabaseNotExistException, CatalogException { + if (!databaseExists(dbName)) { + throw new DatabaseNotExistException(getName(), dbName); + } + return PROCEDURE_MAP.keySet().stream() + .filter(procedurePath -> procedurePath.getDatabaseName().equals(dbName)) + .map(ObjectPath::getObjectName) + .collect(Collectors.toList()); + } + + @Override + public Procedure getProcedure(ObjectPath procedurePath) + throws ProcedureNotExistException, CatalogException { + if (PROCEDURE_MAP.containsKey(procedurePath)) { + return PROCEDURE_MAP.get(procedurePath); + } else { + throw new ProcedureNotExistException(getName(), procedurePath); + } + } + } + + /** A procedure to a sequence from 0 to n for testing purpose. */ + public static class GenerateSequenceProcedure implements Procedure { + public long[] call(ProcedureContext procedureContext, int n) throws Exception { + return generate(procedureContext.getExecutionEnvironment(), n); + } + + public long[] call(ProcedureContext procedureContext, int n, String runTimeMode) + throws Exception { + StreamExecutionEnvironment env = procedureContext.getExecutionEnvironment(); + env.setRuntimeMode(RuntimeExecutionMode.valueOf(runTimeMode)); + return generate(env, n); + } + + private long[] generate(StreamExecutionEnvironment env, int n) throws Exception { + env.setParallelism(1); + long[] sequenceN = new long[n]; + int i = 0; + try (CloseableIterator<Long> result = env.fromSequence(0, n - 1).executeAndCollect()) { + while (result.hasNext()) { + sequenceN[i++] = result.next(); + } + } + return sequenceN; + } + } + + /** A procedure to sum decimal values for testing purpose. */ + public static class SumProcedure implements Procedure { + public @DataTypeHint("ROW< sum_value decimal(10, 2), count INT >") Row[] call( + ProcedureContext procedureContext, + @DataTypeHint("DECIMAL(10, 2)") BigDecimal... inputs) { + if (inputs.length == 0) { + return new Row[] {Row.of(null, 0)}; + } + int counts = inputs.length; + BigDecimal result = inputs[0]; + for (int i = 1; i < inputs.length; i++) { + result = result.add(inputs[i]); + } + return new Row[] {Row.of(result, counts)}; + } + } + + /** A procedure to get year from the passed timestamp parameter for testing purpose. */ + public static class GetYearProcedure implements Procedure { + public String[] call(ProcedureContext procedureContext, LocalDateTime... timestamps) { + String[] results = new String[timestamps.length]; + for (int i = 0; i < results.length; i++) { + results[i] = String.valueOf(timestamps[i].getYear()); + } + return results; + } + } + + /** A procedure to generate a user according to the passed parameters for testing purpose. */ + public static class GenerateUserProcedure implements Procedure { + public UserPojo[] call(ProcedureContext procedureContext, String name, Integer age) { + return new UserPojo[] {new UserPojo(name, age)}; + } + } + + /** A simple pojo class for testing purpose. */ + public static class UserPojo { + private final String name; + private final int age; + + public UserPojo(String name, int age) { + this.name = name; + this.age = age; + } + + public String getName() { + return name; + } + + public int getAge() { + return age; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UserPojo userPojo = (UserPojo) o; + return age == userPojo.age && Objects.equals(name, userPojo.name); + } + + @Override + public int hashCode() { + return Objects.hash(name, age); + } + + @Override + public String toString() { + return "UserPojo{" + "name='" + name + '\'' + ", age=" + age + '}'; + } + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java index 79c33631744..efb6121065b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java @@ -18,11 +18,15 @@ package org.apache.flink.table.planner.runtime.stream.sql; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableResult; import org.apache.flink.table.catalog.CatalogDatabaseImpl; -import org.apache.flink.table.catalog.GenericInMemoryCatalog; -import org.apache.flink.table.catalog.exceptions.CatalogException; -import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.planner.factories.TestProcedureCatalogFactory; import org.apache.flink.table.planner.runtime.utils.StreamingTestBase; import org.apache.flink.types.Row; import org.apache.flink.util.CollectionUtil; @@ -40,14 +44,12 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; /** IT Case for statements related to procedure. */ public class ProcedureITCase extends StreamingTestBase { - private static final String SYSTEM_DATABASE_NAME = "system"; - @BeforeEach @Override public void before() throws Exception { super.before(); - CatalogWithBuildInProcedure procedureCatalog = - new CatalogWithBuildInProcedure("procedure_catalog"); + TestProcedureCatalogFactory.CatalogWithBuiltInProcedure procedureCatalog = + new TestProcedureCatalogFactory.CatalogWithBuiltInProcedure("procedure_catalog"); procedureCatalog.createDatabase( "system", new CatalogDatabaseImpl(Collections.emptyMap(), null), true); tEnv().registerCatalog("test_p", procedureCatalog); @@ -83,8 +85,7 @@ public class ProcedureITCase extends StreamingTestBase { CollectionUtil.iteratorToList( tEnv().executeSql("show procedures in `system`").collect()); assertThat(rows.toString()) - .isEqualTo( - "[+I[generate_n], +I[generate_user], +I[get_year], +I[miss_procedure_context], +I[sum_n]]"); + .isEqualTo("[+I[generate_n], +I[generate_user], +I[get_year], +I[sum_n]]"); // show procedure with like rows = @@ -110,40 +111,72 @@ public class ProcedureITCase extends StreamingTestBase { CollectionUtil.iteratorToList( tEnv().executeSql("show procedures in `system` not like 'generate%'") .collect()); - assertThat(rows.toString()) - .isEqualTo("[+I[get_year], +I[miss_procedure_context], +I[sum_n]]"); + assertThat(rows.toString()).isEqualTo("[+I[get_year], +I[sum_n]]"); // show procedure with not ilike rows = CollectionUtil.iteratorToList( tEnv().executeSql("show procedures in `system` not ilike 'generaTe%'") .collect()); - assertThat(rows.toString()) - .isEqualTo("[+I[get_year], +I[miss_procedure_context], +I[sum_n]]"); + assertThat(rows.toString()).isEqualTo("[+I[get_year], +I[sum_n]]"); + } + + @Test + void testCallProcedure() { + // test call procedure can run a flink job + TableResult tableResult = tEnv().executeSql("call `system`.generate_n(4)"); + verifyTableResult( + tableResult, + Arrays.asList(Row.of(0), Row.of(1), Row.of(2), Row.of(3)), + ResolvedSchema.of( + Column.physical( + "result", DataTypes.BIGINT().notNull().bridgedTo(long.class)))); + + // call a procedure which will run in batch mode + tableResult = tEnv().executeSql("call `system`.generate_n(4, 'BATCH')"); + verifyTableResult( + tableResult, + Arrays.asList(Row.of(0), Row.of(1), Row.of(2), Row.of(3)), + ResolvedSchema.of( + Column.physical( + "result", DataTypes.BIGINT().notNull().bridgedTo(long.class)))); + // check the runtime mode in current env is still streaming + assertThat(tEnv().getConfig().get(ExecutionOptions.RUNTIME_MODE)) + .isEqualTo(RuntimeExecutionMode.STREAMING); + + // test call procedure with var-args as well as output data type hint + tableResult = tEnv().executeSql("call `system`.sum_n(5.5, 1.2, 3.3)"); + verifyTableResult( + tableResult, + Collections.singletonList(Row.of("10.00", 3)), + ResolvedSchema.of( + Column.physical("sum_value", DataTypes.DECIMAL(10, 2)), + Column.physical("count", DataTypes.INT()))); + + // test call procedure with timestamp as input + tableResult = + tEnv().executeSql( + "call `system`.get_year(timestamp '2023-04-22 00:00:00', timestamp '2024-04-22 00:00:00.300')"); + verifyTableResult( + tableResult, + Arrays.asList(Row.of(2023), Row.of(2024)), + ResolvedSchema.of(Column.physical("result", DataTypes.STRING()))); + + // test call procedure with pojo as return type + tableResult = tEnv().executeSql("call `system`.generate_user('yuxia', 18)"); + verifyTableResult( + tableResult, + Collections.singletonList( + Row.of(new TestProcedureCatalogFactory.UserPojo("yuxia", 18))), + ResolvedSchema.of( + Column.physical("name", DataTypes.STRING()), + Column.physical("age", DataTypes.INT().notNull().bridgedTo(int.class)))); } - /** A catalog with some built-in procedures for test purpose. */ - private static class CatalogWithBuildInProcedure extends GenericInMemoryCatalog { - public CatalogWithBuildInProcedure(String name) { - super(name); - } - - @Override - public List<String> listProcedures(String dbName) - throws DatabaseNotExistException, CatalogException { - if (!databaseExists(dbName)) { - throw new DatabaseNotExistException(getName(), dbName); - } - if (dbName.equals(SYSTEM_DATABASE_NAME)) { - return Arrays.asList( - "generate_n", - "sum_n", - "get_year", - "generate_user", - "miss_procedure_context"); - } else { - return Collections.emptyList(); - } - } + private void verifyTableResult( + TableResult tableResult, List<Row> expectedResult, ResolvedSchema expectedSchema) { + assertThat(CollectionUtil.iteratorToList(tableResult.collect()).toString()) + .isEqualTo(expectedResult.toString()); + assertThat(tableResult.getResolvedSchema()).isEqualTo(expectedSchema); } } diff --git a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory index 4d77f4057ac..2444832afb0 100644 --- a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -20,3 +20,4 @@ org.apache.flink.table.planner.factories.TableFactoryHarness$Factory org.apache.flink.table.planner.plan.stream.sql.TestTableFactory org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory +org.apache.flink.table.planner.factories.TestProcedureCatalogFactory
