huyuanfeng2018 commented on code in PR #3090:
URL: https://github.com/apache/amoro/pull/3090#discussion_r1714584486


##########
README.md:
##########
@@ -117,9 +117,9 @@ Amoro is built using Maven with JDK 8 and JDK 17(only for 
`amoro-mixed-format/am
 * Build and skip dashboard: `mvn clean package -Pskip-dashboard-build`
 * Build and disable disk storage, RocksDB will NOT be introduced to avoid 
memory overflow: `mvn clean package -DskipTests -Pno-extented-disk-storage`
 * Build with hadoop 2.x(the default is 3.x) dependencies: `mvn clean package 
-DskipTests -Phadoop2`
-* Specify Flink version for Flink optimizer(the default is 1.18.1): `mvn clean 
package -DskipTests -Dflink-optimizer.flink-version=1.15.4`
+* Specify Flink version for Flink optimizer(the default is 1.20.0): `mvn clean 
package -DskipTests -Dflink-optimizer.flink-version=1.20.0`
   * If the version of Flink is below 1.15.0, you also need to add the 
`-Pflink-optimizer-pre-1.15` parameter: `mvn clean package -DskipTests 
-Pflink-optimizer-pre-1.15 -Dflink-optimizer.flink-version=1.14.6`
-* Specify Spark version for Spark optimizer(the default is 3.3.3): `mvn clean 
package -DskipTests -Dspark-optimizer.spark-version=3.2.2`
+* Specify Spark version for Spark optimizer(the default is 3.3.3): `mvn clean 
package -DskipTests -Dspark-optimizer.spark-version=3.3.3`

Review Comment:
   We should not change spark related documents in this PR



##########
amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java:
##########
@@ -532,7 +563,7 @@ protected JobID runJar(String jarId, Configuration 
configuration, Resource resou
     JobID jobID = JobID.generate();
     JarRunRequestBody runRequestBody =
         new JarRunRequestBody(
-            FLINK_JOB_MAIN_CLASS, args, null, null, jobID, true, null, 
RestoreMode.DEFAULT, null);
+            FLINK_JOB_MAIN_CLASS, args, null, null, jobID, true, null, 
RestoreMode.CLAIM, null);

Review Comment:
   Do we need to use RestoreMode.CLAIM? Essentially, Optimizer does not 
checkpoint, the default mode is NO_CLAIM
   
   



##########
amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java:
##########
@@ -266,15 +254,62 @@ protected String buildOptimizerStartupArgsString(Resource 
resource) {
         jobArgs);
   }
 
+  @VisibleForTesting
+  protected Map<String, String> loadFlinkConfigForYAML(URL path) {
+    this.flinkConfDir = Paths.get(path.getPath()).getParent().toString();
+    return loadFlinkConfig();
+  }
+
+  /**
+   * get flink config with config.yaml or flink-conf.yaml see <a
+   * 
href="https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/config/#flink-configuration-file";></a>
+   *
+   * @return flink config map
+   */
   private Map<String, String> loadFlinkConfig() {
     try {
-      return new Yaml().load(Files.newInputStream(Paths.get(flinkConfDir + 
FLINK_CONFIG_YAML)));
-    } catch (IOException e) {
+      Path flinkConfPath = Paths.get(flinkConfDir + NEW_FLINK_CONFIG_YAML);
+      if (!Files.exists(flinkConfPath, LinkOption.NOFOLLOW_LINKS)) {
+        flinkConfPath = Paths.get(flinkConfDir + FLINK_CONFIG_YAML);
+        return new Yaml().load(Files.newInputStream(flinkConfPath));
+      }
+      Map<String, Object> configDocument =
+          YamlParserUtils.loadYamlFile(new File(flinkConfPath.toUri()));
+      return Maps.transformValues(
+          flatten(configDocument, ""), value -> value == null ? null : 
value.toString());
+    } catch (Exception e) {
       LOG.error("load flink conf yaml failed: {}", e.getMessage());
       return Collections.emptyMap();
     }
   }
 
+  /**
+   * Copy from flink 1.20 GlobalConfiguration.flatten Utils
+   *
+   * @param config
+   * @param keyPrefix
+   * @return
+   */
+  private static Map<String, Object> flatten(Map<String, Object> config, 
String keyPrefix) {

Review Comment:
   How about using  `GlobalConfiguration.loadConfiguration()` directly?
   
   can start another  to do this
   
   



##########
amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java:
##########
@@ -80,25 +84,15 @@ public class FlinkOptimizerContainer extends 
AbstractResourceContainer {
   public static final String FLINK_HOME_PROPERTY = "flink-home";
   public static final String FLINK_CONFIG_PATH = "/conf";
   public static final String FLINK_CONFIG_YAML = "/flink-conf.yaml";
+  // flink version >= 1.20 use it first
+  public static final String NEW_FLINK_CONFIG_YAML = "/config.yaml";

Review Comment:
   ```
   FLINK_CONFIG_YAML="/config.yaml"
   OLD_FLINK_CONFIG_YAML="/flink-conf.yaml"
   ```
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to