This is an automated email from the ASF dual-hosted git repository.

czy006 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b3027d9c [AMORO-4211] Fix legacy Flink config loading (#4249)
2b3027d9c is described below

commit 2b3027d9c95e37fc329eeec7fb6d04f2963284b3
Author: Nikhil Ujjwal <[email protected]>
AuthorDate: Wed Jun 24 21:33:27 2026 +0530

    [AMORO-4211] Fix legacy Flink config loading (#4249)
    
    Fix legacy Flink config loading
    
    Co-authored-by: nujjwal <[email protected]>
---
 .../server/manager/FlinkOptimizerContainer.java    |  6 +++---
 .../manager/TestFlinkOptimizerContainer.java       | 23 ++++++++++++++++++++++
 2 files changed, 26 insertions(+), 3 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
index 39c34a720..b93050f03 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
@@ -34,6 +34,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.YamlParserUtils;
 import org.apache.flink.core.execution.RestoreMode;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -53,7 +54,6 @@ import 
org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
 import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -278,8 +278,8 @@ public class FlinkOptimizerContainer extends 
AbstractOptimizerContainer {
       Map<String, Object> configDocument;
       Path flinkConfPath = Paths.get(flinkConfDir + FLINK_CONFIG_YAML);
       if (!Files.exists(flinkConfPath, LinkOption.NOFOLLOW_LINKS)) {
-        flinkConfPath = Paths.get(flinkConfDir + LEGACY_FLINK_CONFIG_YAML);
-        configDocument = new Yaml().load(Files.newInputStream(flinkConfPath));
+        configDocument = new HashMap<>();
+        
configDocument.putAll(GlobalConfiguration.loadConfiguration(flinkConfDir).toMap());
       } else {
         configDocument = YamlParserUtils.loadYamlFile(new 
File(flinkConfPath.toUri()));
       }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
index d4e4c590a..619a00a8a 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
@@ -28,9 +28,14 @@ import org.apache.amoro.resource.ResourceType;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.amoro.utils.MemorySize;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
 import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.Map;
@@ -39,6 +44,8 @@ import java.util.Objects;
 public class TestFlinkOptimizerContainer {
   FlinkOptimizerContainer container = new FlinkOptimizerContainer();
 
+  @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
   Map<String, String> containerProperties = Maps.newHashMap();
 
   public TestFlinkOptimizerContainer() {
@@ -99,6 +106,22 @@ public class TestFlinkOptimizerContainer {
     Assert.assertEquals(flinkConfig.get(JOB_MANAGER_TOTAL_PROCESS_MEMORY), 
"1600m");
   }
 
+  @Test
+  public void testReadLegacyFlinkConfigWithMetricFilterIncludes() throws 
Exception {
+    File flinkConf = tempFolder.newFile("flink-conf.yaml");
+    Files.write(
+        flinkConf.toPath(),
+        String.join(
+                "\n",
+                "jobmanager.rpc.address: localhost",
+                "xxx.metrics.filter.includes: *RSS:*:*;Heap:Used,Max:*")
+            .getBytes(StandardCharsets.UTF_8));
+
+    Map<String, String> flinkConfig = 
container.loadFlinkConfigForYAML(flinkConf.toURI().toURL());
+    Assert.assertEquals("localhost", 
flinkConfig.get("jobmanager.rpc.address"));
+    Assert.assertEquals("*RSS:*:*;Heap:Used,Max:*", 
flinkConfig.get("xxx.metrics.filter.includes"));
+  }
+
   @Test
   public void testBuildFlinkOptions() {
     Map<String, String> containerProperties = 
Maps.newHashMap(this.containerProperties);

Reply via email to