This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push:
new 620c5a7aeba [BP-1.18][FLINK-34517][table]fix environment configs
ignored when calling procedure operation (#24419)
620c5a7aeba is described below
commit 620c5a7aeba448d247107ad44a6ba6f1e759052e
Author: JustinLee <[email protected]>
AuthorDate: Mon Mar 4 16:16:43 2024 +0800
[BP-1.18][FLINK-34517][table]fix environment configs ignored when calling
procedure operation (#24419)
---
.../operations/PlannerCallProcedureOperation.java | 14 ++++++--
.../factories/TestProcedureCatalogFactory.java | 21 ++++++++++++
.../runtime/stream/sql/ProcedureITCase.java | 39 ++++++++++++++++++++--
3 files changed, 68 insertions(+), 6 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
index a96fbf65f56..71fa78e545e 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.operations;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
@@ -125,9 +126,7 @@ public class PlannerCallProcedureOperation implements
CallProcedureOperation {
TableConfig tableConfig, ClassLoader userClassLoader) {
// should be [ProcedureContext, arg1, arg2, ..]
Object[] argumentVal = new Object[1 + internalInputArguments.length];
- StreamExecutionEnvironment env =
-
StreamExecutionEnvironment.getExecutionEnvironment(tableConfig.getConfiguration());
- argumentVal[0] = new DefaultProcedureContext(env);
+ argumentVal[0] = getProcedureContext(tableConfig);
for (int i = 0; i < internalInputArguments.length; i++) {
argumentVal[i + 1] =
toExternal(internalInputArguments[i], inputTypes[i],
userClassLoader);
@@ -135,6 +134,15 @@ public class PlannerCallProcedureOperation implements
CallProcedureOperation {
return argumentVal;
}
+ private ProcedureContext getProcedureContext(TableConfig tableConfig) {
+ Configuration configuration =
+ new Configuration((Configuration)
tableConfig.getRootConfiguration());
+ configuration.addAll(tableConfig.getConfiguration());
+ StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+ return new DefaultProcedureContext(env);
+ }
+
/** Convert the value with internal representation to the value with
external representation. */
private Object toExternal(Object internalValue, DataType inputType,
ClassLoader classLoader) {
if (!(DataTypeUtils.isInternal(inputType))) {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
index 11317d5c717..528f93c7742 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
@@ -20,8 +20,10 @@ package org.apache.flink.table.planner.factories;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectPath;
@@ -37,6 +39,7 @@ import org.apache.flink.util.CloseableIterator;
import java.math.BigDecimal;
import java.time.LocalDateTime;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -84,6 +87,8 @@ public class TestProcedureCatalogFactory implements
CatalogFactory {
PROCEDURE_MAP.put(ObjectPath.fromString("system.get_year"), new
GetYearProcedure());
PROCEDURE_MAP.put(
ObjectPath.fromString("system.generate_user"), new
GenerateUserProcedure());
+ PROCEDURE_MAP.put(
+ ObjectPath.fromString("system.get_env_conf"), new
EnvironmentConfProcedure());
}
public CatalogWithBuiltInProcedure(String name) {
@@ -174,6 +179,22 @@ public class TestProcedureCatalogFactory implements
CatalogFactory {
}
}
+ /** A procedure to get environment configs for testing purpose. */
+ @ProcedureHint(output = @DataTypeHint("ROW<k STRING, v STRING>"))
+ public static class EnvironmentConfProcedure implements Procedure {
+ public Row[] call(ProcedureContext procedureContext) throws Exception {
+ StreamExecutionEnvironment env =
procedureContext.getExecutionEnvironment();
+ Configuration config = (Configuration) env.getConfiguration();
+ List<Row> rows = new ArrayList<>();
+ config.toMap()
+ .forEach(
+ (k, v) -> {
+ rows.add(Row.of(k, v));
+ });
+ return rows.toArray(new Row[0]);
+ }
+ }
+
/** A simple pojo class for testing purpose. */
public static class UserPojo {
private final String name;
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
index da3262b6f12..ac21c3d697e 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
@@ -19,13 +19,17 @@
package org.apache.flink.table.planner.runtime.stream.sql;
import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.planner.factories.TestProcedureCatalogFactory;
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
import org.apache.flink.types.Row;
@@ -85,7 +89,8 @@ public class ProcedureITCase extends StreamingTestBase {
CollectionUtil.iteratorToList(
tEnv().executeSql("show procedures in
`system`").collect());
assertThat(rows.toString())
- .isEqualTo("[+I[generate_n], +I[generate_user], +I[get_year],
+I[sum_n]]");
+ .isEqualTo(
+ "[+I[generate_n], +I[generate_user], +I[get_env_conf],
+I[get_year], +I[sum_n]]");
// show procedure with like
rows =
@@ -111,14 +116,14 @@ public class ProcedureITCase extends StreamingTestBase {
CollectionUtil.iteratorToList(
tEnv().executeSql("show procedures in `system` not
like 'generate%'")
.collect());
- assertThat(rows.toString()).isEqualTo("[+I[get_year], +I[sum_n]]");
+ assertThat(rows.toString()).isEqualTo("[+I[get_env_conf],
+I[get_year], +I[sum_n]]");
// show procedure with not ilike
rows =
CollectionUtil.iteratorToList(
tEnv().executeSql("show procedures in `system` not
ilike 'generaTe%'")
.collect());
- assertThat(rows.toString()).isEqualTo("[+I[get_year], +I[sum_n]]");
+ assertThat(rows.toString()).isEqualTo("[+I[get_env_conf],
+I[get_year], +I[sum_n]]");
}
@Test
@@ -172,6 +177,34 @@ public class ProcedureITCase extends StreamingTestBase {
Column.physical("age",
DataTypes.INT().notNull().bridgedTo(int.class))));
}
+ @Test
+ void testEnvironmentConf() throws DatabaseAlreadyExistException {
+ // root conf should work
+ Configuration configuration = new Configuration();
+ configuration.setString("key1", "value1");
+ StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+ tableEnv.getConfig().set("key2", "value2");
+
+ TestProcedureCatalogFactory.CatalogWithBuiltInProcedure
procedureCatalog =
+ new
TestProcedureCatalogFactory.CatalogWithBuiltInProcedure("procedure_catalog");
+ procedureCatalog.createDatabase(
+ "system", new CatalogDatabaseImpl(Collections.emptyMap(),
null), true);
+ tableEnv.registerCatalog("test_p", procedureCatalog);
+ tableEnv.useCatalog("test_p");
+ TableResult tableResult = tableEnv.executeSql("call
`system`.get_env_conf()");
+ List<Row> environmentConf =
CollectionUtil.iteratorToList(tableResult.collect());
+ assertThat(environmentConf.contains(Row.of("key1",
"value1"))).isTrue();
+ assertThat(environmentConf.contains(Row.of("key2",
"value2"))).isTrue();
+
+ // table conf should overwrite root conf
+ tableEnv.getConfig().set("key1", "value11");
+ tableResult = tableEnv.executeSql("call `system`.get_env_conf()");
+ environmentConf = CollectionUtil.iteratorToList(tableResult.collect());
+ assertThat(environmentConf.contains(Row.of("key1",
"value11"))).isTrue();
+ }
+
private void verifyTableResult(
TableResult tableResult, List<Row> expectedResult, ResolvedSchema
expectedSchema) {
assertThat(CollectionUtil.iteratorToList(tableResult.collect()).toString())