This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 24b6f7cbf94 [FLINK-34517][table]fix environment configs ignored when 
calling procedure operation (#24397)
24b6f7cbf94 is described below

commit 24b6f7cbf94fca154fc8680e8b3393abd68b8e77
Author: JustinLee <[email protected]>
AuthorDate: Wed Mar 6 09:44:49 2024 +0800

    [FLINK-34517][table]fix environment configs ignored when calling procedure 
operation (#24397)
---
 .../operations/PlannerCallProcedureOperation.java  | 14 ++++++--
 .../factories/TestProcedureCatalogFactory.java     | 20 ++++++++++++
 .../runtime/stream/sql/ProcedureITCase.java        | 38 ++++++++++++++++++++--
 3 files changed, 66 insertions(+), 6 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
index 8638efae8b0..66d6a6074e3 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.operations;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
@@ -126,9 +127,7 @@ public class PlannerCallProcedureOperation implements 
CallProcedureOperation {
             TableConfig tableConfig, ClassLoader userClassLoader) {
         // should be [ProcedureContext, arg1, arg2, ..]
         Object[] argumentVal = new Object[1 + internalInputArguments.length];
-        StreamExecutionEnvironment env =
-                
StreamExecutionEnvironment.getExecutionEnvironment(tableConfig.getConfiguration());
-        argumentVal[0] = new DefaultProcedureContext(env);
+        argumentVal[0] = getProcedureContext(tableConfig);
         for (int i = 0; i < internalInputArguments.length; i++) {
             argumentVal[i + 1] =
                     (internalInputArguments[i] != null)
@@ -138,6 +137,15 @@ public class PlannerCallProcedureOperation implements 
CallProcedureOperation {
         return argumentVal;
     }
 
+    private ProcedureContext getProcedureContext(TableConfig tableConfig) {
+        Configuration configuration =
+                new Configuration((Configuration) 
tableConfig.getRootConfiguration());
+        configuration.addAll(tableConfig.getConfiguration());
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+        return new DefaultProcedureContext(env);
+    }
+
     /** Convert the value with internal representation to the value with 
external representation. */
     private Object toExternal(Object internalValue, DataType inputType, 
ClassLoader classLoader) {
         if (!(DataTypeUtils.isInternal(inputType))) {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
index 126c9643ea5..cf8e4f46b0d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.factories;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.annotation.ArgumentHint;
 import org.apache.flink.table.annotation.DataTypeHint;
@@ -39,6 +40,7 @@ import org.apache.flink.util.CloseableIterator;
 
 import java.math.BigDecimal;
 import java.time.LocalDateTime;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -94,6 +96,8 @@ public class TestProcedureCatalogFactory implements 
CatalogFactory {
             PROCEDURE_MAP.put(
                     ObjectPath.fromString("system.named_args_optional"),
                     new NamedArgumentsProcedureWithOptionalArguments());
+            PROCEDURE_MAP.put(
+                    ObjectPath.fromString("system.get_env_conf"), new 
EnvironmentConfProcedure());
         }
 
         public CatalogWithBuiltInProcedure(String name) {
@@ -230,6 +234,22 @@ public class TestProcedureCatalogFactory implements 
CatalogFactory {
         }
     }
 
+    /** A procedure to get environment configs for testing purpose. */
+    @ProcedureHint(output = @DataTypeHint("ROW<k STRING, v STRING>"))
+    public static class EnvironmentConfProcedure implements Procedure {
+        public Row[] call(ProcedureContext procedureContext) throws Exception {
+            StreamExecutionEnvironment env = 
procedureContext.getExecutionEnvironment();
+            Configuration config = (Configuration) env.getConfiguration();
+            List<Row> rows = new ArrayList<>();
+            config.toMap()
+                    .forEach(
+                            (k, v) -> {
+                                rows.add(Row.of(k, v));
+                            });
+            return rows.toArray(new Row[0]);
+        }
+    }
+
     /** A simple pojo class for testing purpose. */
     public static class UserPojo {
         private final String name;
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
index dc5e2319b4d..c1a54158721 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
@@ -19,14 +19,18 @@
 package org.apache.flink.table.planner.runtime.stream.sql;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.planner.factories.TestProcedureCatalogFactory;
 import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
 import org.apache.flink.types.Row;
@@ -88,7 +92,7 @@ class ProcedureITCase extends StreamingTestBase {
                         tEnv().executeSql("show procedures in 
`system`").collect());
         assertThat(rows.toString())
                 .isEqualTo(
-                        "[+I[generate_n], +I[generate_user], +I[get_year], 
+I[named_args], +I[named_args_optional], +I[named_args_overload], +I[sum_n]]");
+                        "[+I[generate_n], +I[generate_user], +I[get_env_conf], 
+I[get_year], +I[named_args], +I[named_args_optional], +I[named_args_overload], 
+I[sum_n]]");
 
         // show procedure with like
         rows =
@@ -116,7 +120,7 @@ class ProcedureITCase extends StreamingTestBase {
                                 .collect());
         assertThat(rows.toString())
                 .isEqualTo(
-                        "[+I[get_year], +I[named_args], 
+I[named_args_optional], +I[named_args_overload], +I[sum_n]]");
+                        "[+I[get_env_conf], +I[get_year], +I[named_args], 
+I[named_args_optional], +I[named_args_overload], +I[sum_n]]");
 
         // show procedure with not ilike
         rows =
@@ -125,7 +129,7 @@ class ProcedureITCase extends StreamingTestBase {
                                 .collect());
         assertThat(rows.toString())
                 .isEqualTo(
-                        "[+I[get_year], +I[named_args], 
+I[named_args_optional], +I[named_args_overload], +I[sum_n]]");
+                        "[+I[get_env_conf], +I[get_year], +I[named_args], 
+I[named_args_optional], +I[named_args_overload], +I[sum_n]]");
     }
 
     @Test
@@ -210,6 +214,34 @@ class ProcedureITCase extends StreamingTestBase {
                 ResolvedSchema.of(Column.physical("result", 
DataTypes.STRING())));
     }
 
+    @Test
+    void testEnvironmentConf() throws DatabaseAlreadyExistException {
+        // root conf should work
+        Configuration configuration = new Configuration();
+        configuration.setString("key1", "value1");
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+        tableEnv.getConfig().set("key2", "value2");
+
+        TestProcedureCatalogFactory.CatalogWithBuiltInProcedure 
procedureCatalog =
+                new 
TestProcedureCatalogFactory.CatalogWithBuiltInProcedure("procedure_catalog");
+        procedureCatalog.createDatabase(
+                "system", new CatalogDatabaseImpl(Collections.emptyMap(), 
null), true);
+        tableEnv.registerCatalog("test_p", procedureCatalog);
+        tableEnv.useCatalog("test_p");
+        TableResult tableResult = tableEnv.executeSql("call 
`system`.get_env_conf()");
+        List<Row> environmentConf = 
CollectionUtil.iteratorToList(tableResult.collect());
+        assertThat(environmentConf.contains(Row.of("key1", 
"value1"))).isTrue();
+        assertThat(environmentConf.contains(Row.of("key2", 
"value2"))).isTrue();
+
+        // table conf should overwrite root conf
+        tableEnv.getConfig().set("key1", "value11");
+        tableResult = tableEnv.executeSql("call `system`.get_env_conf()");
+        environmentConf = CollectionUtil.iteratorToList(tableResult.collect());
+        assertThat(environmentConf.contains(Row.of("key1", 
"value11"))).isTrue();
+    }
+
     private void verifyTableResult(
             TableResult tableResult, List<Row> expectedResult, ResolvedSchema 
expectedSchema) {
         
assertThat(CollectionUtil.iteratorToList(tableResult.collect()).toString())

Reply via email to