This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
     new 620c5a7aeba [BP-1.18][FLINK-34517][table]fix environment configs 
ignored when calling procedure operation (#24419)
620c5a7aeba is described below

commit 620c5a7aeba448d247107ad44a6ba6f1e759052e
Author: JustinLee <[email protected]>
AuthorDate: Mon Mar 4 16:16:43 2024 +0800

    [BP-1.18][FLINK-34517][table]fix environment configs ignored when calling 
procedure operation (#24419)
---
 .../operations/PlannerCallProcedureOperation.java  | 14 ++++++--
 .../factories/TestProcedureCatalogFactory.java     | 21 ++++++++++++
 .../runtime/stream/sql/ProcedureITCase.java        | 39 ++++++++++++++++++++--
 3 files changed, 68 insertions(+), 6 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
index a96fbf65f56..71fa78e545e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.operations;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
@@ -125,9 +126,7 @@ public class PlannerCallProcedureOperation implements 
CallProcedureOperation {
             TableConfig tableConfig, ClassLoader userClassLoader) {
         // should be [ProcedureContext, arg1, arg2, ..]
         Object[] argumentVal = new Object[1 + internalInputArguments.length];
-        StreamExecutionEnvironment env =
-                
StreamExecutionEnvironment.getExecutionEnvironment(tableConfig.getConfiguration());
-        argumentVal[0] = new DefaultProcedureContext(env);
+        argumentVal[0] = getProcedureContext(tableConfig);
         for (int i = 0; i < internalInputArguments.length; i++) {
             argumentVal[i + 1] =
                     toExternal(internalInputArguments[i], inputTypes[i], 
userClassLoader);
@@ -135,6 +134,15 @@ public class PlannerCallProcedureOperation implements 
CallProcedureOperation {
         return argumentVal;
     }
 
+    private ProcedureContext getProcedureContext(TableConfig tableConfig) {
+        Configuration configuration =
+                new Configuration((Configuration) 
tableConfig.getRootConfiguration());
+        configuration.addAll(tableConfig.getConfiguration());
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+        return new DefaultProcedureContext(env);
+    }
+
     /** Convert the value with internal representation to the value with 
external representation. */
     private Object toExternal(Object internalValue, DataType inputType, 
ClassLoader classLoader) {
         if (!(DataTypeUtils.isInternal(inputType))) {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
index 11317d5c717..528f93c7742 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
@@ -20,8 +20,10 @@ package org.apache.flink.table.planner.factories;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -37,6 +39,7 @@ import org.apache.flink.util.CloseableIterator;
 
 import java.math.BigDecimal;
 import java.time.LocalDateTime;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -84,6 +87,8 @@ public class TestProcedureCatalogFactory implements 
CatalogFactory {
             PROCEDURE_MAP.put(ObjectPath.fromString("system.get_year"), new 
GetYearProcedure());
             PROCEDURE_MAP.put(
                     ObjectPath.fromString("system.generate_user"), new 
GenerateUserProcedure());
+            PROCEDURE_MAP.put(
+                    ObjectPath.fromString("system.get_env_conf"), new 
EnvironmentConfProcedure());
         }
 
         public CatalogWithBuiltInProcedure(String name) {
@@ -174,6 +179,22 @@ public class TestProcedureCatalogFactory implements 
CatalogFactory {
         }
     }
 
+    /** A procedure to get environment configs for testing purpose. */
+    @ProcedureHint(output = @DataTypeHint("ROW<k STRING, v STRING>"))
+    public static class EnvironmentConfProcedure implements Procedure {
+        public Row[] call(ProcedureContext procedureContext) throws Exception {
+            StreamExecutionEnvironment env = 
procedureContext.getExecutionEnvironment();
+            Configuration config = (Configuration) env.getConfiguration();
+            List<Row> rows = new ArrayList<>();
+            config.toMap()
+                    .forEach(
+                            (k, v) -> {
+                                rows.add(Row.of(k, v));
+                            });
+            return rows.toArray(new Row[0]);
+        }
+    }
+
     /** A simple pojo class for testing purpose. */
     public static class UserPojo {
         private final String name;
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
index da3262b6f12..ac21c3d697e 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
@@ -19,13 +19,17 @@
 package org.apache.flink.table.planner.runtime.stream.sql;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.planner.factories.TestProcedureCatalogFactory;
 import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
 import org.apache.flink.types.Row;
@@ -85,7 +89,8 @@ public class ProcedureITCase extends StreamingTestBase {
                 CollectionUtil.iteratorToList(
                         tEnv().executeSql("show procedures in 
`system`").collect());
         assertThat(rows.toString())
-                .isEqualTo("[+I[generate_n], +I[generate_user], +I[get_year], 
+I[sum_n]]");
+                .isEqualTo(
+                        "[+I[generate_n], +I[generate_user], +I[get_env_conf], 
+I[get_year], +I[sum_n]]");
 
         // show procedure with like
         rows =
@@ -111,14 +116,14 @@ public class ProcedureITCase extends StreamingTestBase {
                 CollectionUtil.iteratorToList(
                         tEnv().executeSql("show procedures in `system` not 
like 'generate%'")
                                 .collect());
-        assertThat(rows.toString()).isEqualTo("[+I[get_year], +I[sum_n]]");
+        assertThat(rows.toString()).isEqualTo("[+I[get_env_conf], 
+I[get_year], +I[sum_n]]");
 
         // show procedure with not ilike
         rows =
                 CollectionUtil.iteratorToList(
                         tEnv().executeSql("show procedures in `system` not 
ilike 'generaTe%'")
                                 .collect());
-        assertThat(rows.toString()).isEqualTo("[+I[get_year], +I[sum_n]]");
+        assertThat(rows.toString()).isEqualTo("[+I[get_env_conf], 
+I[get_year], +I[sum_n]]");
     }
 
     @Test
@@ -172,6 +177,34 @@ public class ProcedureITCase extends StreamingTestBase {
                         Column.physical("age", 
DataTypes.INT().notNull().bridgedTo(int.class))));
     }
 
+    @Test
+    void testEnvironmentConf() throws DatabaseAlreadyExistException {
+        // root conf should work
+        Configuration configuration = new Configuration();
+        configuration.setString("key1", "value1");
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+        tableEnv.getConfig().set("key2", "value2");
+
+        TestProcedureCatalogFactory.CatalogWithBuiltInProcedure 
procedureCatalog =
+                new 
TestProcedureCatalogFactory.CatalogWithBuiltInProcedure("procedure_catalog");
+        procedureCatalog.createDatabase(
+                "system", new CatalogDatabaseImpl(Collections.emptyMap(), 
null), true);
+        tableEnv.registerCatalog("test_p", procedureCatalog);
+        tableEnv.useCatalog("test_p");
+        TableResult tableResult = tableEnv.executeSql("call 
`system`.get_env_conf()");
+        List<Row> environmentConf = 
CollectionUtil.iteratorToList(tableResult.collect());
+        assertThat(environmentConf.contains(Row.of("key1", 
"value1"))).isTrue();
+        assertThat(environmentConf.contains(Row.of("key2", 
"value2"))).isTrue();
+
+        // table conf should overwrite root conf
+        tableEnv.getConfig().set("key1", "value11");
+        tableResult = tableEnv.executeSql("call `system`.get_env_conf()");
+        environmentConf = CollectionUtil.iteratorToList(tableResult.collect());
+        assertThat(environmentConf.contains(Row.of("key1", 
"value11"))).isTrue();
+    }
+
     private void verifyTableResult(
             TableResult tableResult, List<Row> expectedResult, ResolvedSchema 
expectedSchema) {
         
assertThat(CollectionUtil.iteratorToList(tableResult.collect()).toString())

Reply via email to