This is an automated email from the ASF dual-hosted git repository.
czy006 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 2b3027d9c [AMORO-4211] Fix legacy Flink config loading (#4249)
2b3027d9c is described below
commit 2b3027d9c95e37fc329eeec7fb6d04f2963284b3
Author: Nikhil Ujjwal <[email protected]>
AuthorDate: Wed Jun 24 21:33:27 2026 +0530
[AMORO-4211] Fix legacy Flink config loading (#4249)
Fix legacy Flink config loading
Co-authored-by: nujjwal <[email protected]>
---
.../server/manager/FlinkOptimizerContainer.java | 6 +++---
.../manager/TestFlinkOptimizerContainer.java | 23 ++++++++++++++++++++++
2 files changed, 26 insertions(+), 3 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
index 39c34a720..b93050f03 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
@@ -34,6 +34,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.YamlParserUtils;
import org.apache.flink.core.execution.RestoreMode;
import org.apache.flink.runtime.messages.Acknowledge;
@@ -53,7 +54,6 @@ import
org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
import java.io.BufferedReader;
import java.io.File;
@@ -278,8 +278,8 @@ public class FlinkOptimizerContainer extends
AbstractOptimizerContainer {
Map<String, Object> configDocument;
Path flinkConfPath = Paths.get(flinkConfDir + FLINK_CONFIG_YAML);
if (!Files.exists(flinkConfPath, LinkOption.NOFOLLOW_LINKS)) {
- flinkConfPath = Paths.get(flinkConfDir + LEGACY_FLINK_CONFIG_YAML);
- configDocument = new Yaml().load(Files.newInputStream(flinkConfPath));
+ configDocument = new HashMap<>();
+
configDocument.putAll(GlobalConfiguration.loadConfiguration(flinkConfDir).toMap());
} else {
configDocument = YamlParserUtils.loadYamlFile(new
File(flinkConfPath.toUri()));
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
b/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
index d4e4c590a..619a00a8a 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
@@ -28,9 +28,14 @@ import org.apache.amoro.resource.ResourceType;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.utils.MemorySize;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
@@ -39,6 +44,8 @@ import java.util.Objects;
public class TestFlinkOptimizerContainer {
FlinkOptimizerContainer container = new FlinkOptimizerContainer();
+ @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
Map<String, String> containerProperties = Maps.newHashMap();
public TestFlinkOptimizerContainer() {
@@ -99,6 +106,22 @@ public class TestFlinkOptimizerContainer {
Assert.assertEquals(flinkConfig.get(JOB_MANAGER_TOTAL_PROCESS_MEMORY),
"1600m");
}
+ @Test
+ public void testReadLegacyFlinkConfigWithMetricFilterIncludes() throws
Exception {
+ File flinkConf = tempFolder.newFile("flink-conf.yaml");
+ Files.write(
+ flinkConf.toPath(),
+ String.join(
+ "\n",
+ "jobmanager.rpc.address: localhost",
+ "xxx.metrics.filter.includes: *RSS:*:*;Heap:Used,Max:*")
+ .getBytes(StandardCharsets.UTF_8));
+
+ Map<String, String> flinkConfig =
container.loadFlinkConfigForYAML(flinkConf.toURI().toURL());
+ Assert.assertEquals("localhost",
flinkConfig.get("jobmanager.rpc.address"));
+ Assert.assertEquals("*RSS:*:*;Heap:Used,Max:*",
flinkConfig.get("xxx.metrics.filter.includes"));
+ }
+
@Test
public void testBuildFlinkOptions() {
Map<String, String> containerProperties =
Maps.newHashMap(this.containerProperties);