This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 24b6f7cbf94 [FLINK-34517][table]fix environment configs ignored when
calling procedure operation (#24397)
24b6f7cbf94 is described below
commit 24b6f7cbf94fca154fc8680e8b3393abd68b8e77
Author: JustinLee <[email protected]>
AuthorDate: Wed Mar 6 09:44:49 2024 +0800
[FLINK-34517][table]fix environment configs ignored when calling procedure
operation (#24397)
---
.../operations/PlannerCallProcedureOperation.java | 14 ++++++--
.../factories/TestProcedureCatalogFactory.java | 20 ++++++++++++
.../runtime/stream/sql/ProcedureITCase.java | 38 ++++++++++++++++++++--
3 files changed, 66 insertions(+), 6 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
index 8638efae8b0..66d6a6074e3 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.operations;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
@@ -126,9 +127,7 @@ public class PlannerCallProcedureOperation implements
CallProcedureOperation {
TableConfig tableConfig, ClassLoader userClassLoader) {
// should be [ProcedureContext, arg1, arg2, ..]
Object[] argumentVal = new Object[1 + internalInputArguments.length];
- StreamExecutionEnvironment env =
-
StreamExecutionEnvironment.getExecutionEnvironment(tableConfig.getConfiguration());
- argumentVal[0] = new DefaultProcedureContext(env);
+ argumentVal[0] = getProcedureContext(tableConfig);
for (int i = 0; i < internalInputArguments.length; i++) {
argumentVal[i + 1] =
(internalInputArguments[i] != null)
@@ -138,6 +137,15 @@ public class PlannerCallProcedureOperation implements
CallProcedureOperation {
return argumentVal;
}
+ private ProcedureContext getProcedureContext(TableConfig tableConfig) {
+ Configuration configuration =
+ new Configuration((Configuration)
tableConfig.getRootConfiguration());
+ configuration.addAll(tableConfig.getConfiguration());
+ StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+ return new DefaultProcedureContext(env);
+ }
+
/** Convert the value with internal representation to the value with
external representation. */
private Object toExternal(Object internalValue, DataType inputType,
ClassLoader classLoader) {
if (!(DataTypeUtils.isInternal(inputType))) {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
index 126c9643ea5..cf8e4f46b0d 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.factories;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
@@ -39,6 +40,7 @@ import org.apache.flink.util.CloseableIterator;
import java.math.BigDecimal;
import java.time.LocalDateTime;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -94,6 +96,8 @@ public class TestProcedureCatalogFactory implements
CatalogFactory {
PROCEDURE_MAP.put(
ObjectPath.fromString("system.named_args_optional"),
new NamedArgumentsProcedureWithOptionalArguments());
+ PROCEDURE_MAP.put(
+ ObjectPath.fromString("system.get_env_conf"), new
EnvironmentConfProcedure());
}
public CatalogWithBuiltInProcedure(String name) {
@@ -230,6 +234,22 @@ public class TestProcedureCatalogFactory implements
CatalogFactory {
}
}
+ /** A procedure to get environment configs for testing purpose. */
+ @ProcedureHint(output = @DataTypeHint("ROW<k STRING, v STRING>"))
+ public static class EnvironmentConfProcedure implements Procedure {
+ public Row[] call(ProcedureContext procedureContext) throws Exception {
+ StreamExecutionEnvironment env =
procedureContext.getExecutionEnvironment();
+ Configuration config = (Configuration) env.getConfiguration();
+ List<Row> rows = new ArrayList<>();
+ config.toMap()
+ .forEach(
+ (k, v) -> {
+ rows.add(Row.of(k, v));
+ });
+ return rows.toArray(new Row[0]);
+ }
+ }
+
/** A simple pojo class for testing purpose. */
public static class UserPojo {
private final String name;
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
index dc5e2319b4d..c1a54158721 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
@@ -19,14 +19,18 @@
package org.apache.flink.table.planner.runtime.stream.sql;
import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.planner.factories.TestProcedureCatalogFactory;
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
import org.apache.flink.types.Row;
@@ -88,7 +92,7 @@ class ProcedureITCase extends StreamingTestBase {
tEnv().executeSql("show procedures in
`system`").collect());
assertThat(rows.toString())
.isEqualTo(
- "[+I[generate_n], +I[generate_user], +I[get_year],
+I[named_args], +I[named_args_optional], +I[named_args_overload], +I[sum_n]]");
+ "[+I[generate_n], +I[generate_user], +I[get_env_conf],
+I[get_year], +I[named_args], +I[named_args_optional], +I[named_args_overload],
+I[sum_n]]");
// show procedure with like
rows =
@@ -116,7 +120,7 @@ class ProcedureITCase extends StreamingTestBase {
.collect());
assertThat(rows.toString())
.isEqualTo(
- "[+I[get_year], +I[named_args],
+I[named_args_optional], +I[named_args_overload], +I[sum_n]]");
+ "[+I[get_env_conf], +I[get_year], +I[named_args],
+I[named_args_optional], +I[named_args_overload], +I[sum_n]]");
// show procedure with not ilike
rows =
@@ -125,7 +129,7 @@ class ProcedureITCase extends StreamingTestBase {
.collect());
assertThat(rows.toString())
.isEqualTo(
- "[+I[get_year], +I[named_args],
+I[named_args_optional], +I[named_args_overload], +I[sum_n]]");
+ "[+I[get_env_conf], +I[get_year], +I[named_args],
+I[named_args_optional], +I[named_args_overload], +I[sum_n]]");
}
@Test
@@ -210,6 +214,34 @@ class ProcedureITCase extends StreamingTestBase {
ResolvedSchema.of(Column.physical("result",
DataTypes.STRING())));
}
+ @Test
+ void testEnvironmentConf() throws DatabaseAlreadyExistException {
+ // root conf should work
+ Configuration configuration = new Configuration();
+ configuration.setString("key1", "value1");
+ StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+ tableEnv.getConfig().set("key2", "value2");
+
+ TestProcedureCatalogFactory.CatalogWithBuiltInProcedure
procedureCatalog =
+ new
TestProcedureCatalogFactory.CatalogWithBuiltInProcedure("procedure_catalog");
+ procedureCatalog.createDatabase(
+ "system", new CatalogDatabaseImpl(Collections.emptyMap(),
null), true);
+ tableEnv.registerCatalog("test_p", procedureCatalog);
+ tableEnv.useCatalog("test_p");
+ TableResult tableResult = tableEnv.executeSql("call
`system`.get_env_conf()");
+ List<Row> environmentConf =
CollectionUtil.iteratorToList(tableResult.collect());
+ assertThat(environmentConf.contains(Row.of("key1",
"value1"))).isTrue();
+ assertThat(environmentConf.contains(Row.of("key2",
"value2"))).isTrue();
+
+ // table conf should overwrite root conf
+ tableEnv.getConfig().set("key1", "value11");
+ tableResult = tableEnv.executeSql("call `system`.get_env_conf()");
+ environmentConf = CollectionUtil.iteratorToList(tableResult.collect());
+ assertThat(environmentConf.contains(Row.of("key1",
"value11"))).isTrue();
+ }
+
private void verifyTableResult(
TableResult tableResult, List<Row> expectedResult, ResolvedSchema
expectedSchema) {
assertThat(CollectionUtil.iteratorToList(tableResult.collect()).toString())