This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f510187a [AMORO-3087]: Optimizer Support Flink 1.20 (#3087) (#3090)
4f510187a is described below

commit 4f510187a950313ac70b3a6130f0bcbc9c98ca19
Author: ConradJam <[email protected]>
AuthorDate: Wed Aug 14 20:15:26 2024 +0800

    [AMORO-3087]: Optimizer Support Flink 1.20 (#3087) (#3090)
    
    * [AMORO-3087]: Optimizer Support Flink 1.20 (#3087)
    
    * fix docs
    
    * fix
    
    * docker images change flink version to 1.20
    
    ---------
    
    Co-authored-by: ConradJam <[email protected]>
---
 .github/workflows/docker-images.yml                |  6 +-
 README.md                                          |  2 +-
 .../server/manager/FlinkOptimizerContainer.java    | 95 ++++++++++++++--------
 .../manager/TestFlinkOptimizerContainer.java       | 49 ++++++++---
 .../src/test/resources/config.yaml                 | 38 +++++----
 .../src/test/resources/flink-conf.yaml             | 31 ++++---
 docker/build.sh                                    |  4 +-
 docker/optimizer-flink/Dockerfile                  |  2 +-
 pom.xml                                            |  2 +-
 9 files changed, 145 insertions(+), 84 deletions(-)

diff --git a/.github/workflows/docker-images.yml 
b/.github/workflows/docker-images.yml
index 9866df029..f2d1c16c1 100644
--- a/.github/workflows/docker-images.yml
+++ b/.github/workflows/docker-images.yml
@@ -103,7 +103,7 @@ jobs:
     if: ${{ startsWith(github.repository, 'apache/') }}
     strategy:
       matrix:
-        flink: [ "1.14.6", "1.18.1" ]
+        flink: [ "1.14.6", "1.20.0" ]
     steps:
       - uses: actions/checkout@v3
       - name: Set up JDK 8
@@ -129,11 +129,11 @@ jobs:
           tags: |
             type=ref,event=branch,enable=${{ matrix.flink == '1.14.6' 
}},suffix=-snapshot
             type=ref,event=branch,enable=${{ matrix.flink == '1.14.6' 
}},suffix=-snapshot-flink1.14
-            type=ref,event=branch,enable=${{ matrix.flink == '1.18.1' 
}},suffix=-snapshot-flink1.18
+            type=ref,event=branch,enable=${{ matrix.flink == '1.20.0' 
}},suffix=-snapshot-flink1.20
             type=raw,enable=${{ matrix.hadoop == '1.14.6' && 
startsWith(github.ref, 'refs/tags/v') }},value=latest
             type=semver,enable=${{ matrix.flink == '1.14.6' 
}},pattern={{version}}
             type=semver,enable=${{ matrix.flink == '1.14.6' 
}},pattern={{version}}, suffix=-flink1.14
-            type=semver,enable=${{ matrix.flink == '1.18.1' 
}},pattern={{version}}, suffix=-flink1.18
+            type=semver,enable=${{ matrix.flink == '1.20.0' 
}},pattern={{version}}, suffix=-flink1.20
 
       - name: Print tags
         run: echo '${{ steps.meta.outputs.tags }}'
diff --git a/README.md b/README.md
index cf13ff4a5..9f1f8276b 100644
--- a/README.md
+++ b/README.md
@@ -117,7 +117,7 @@ Amoro is built using Maven with JDK 8 and JDK 17(only for 
`amoro-mixed-format/am
 * Build and skip dashboard: `mvn clean package -Pskip-dashboard-build`
 * Build and disable disk storage, RocksDB will NOT be introduced to avoid 
memory overflow: `mvn clean package -DskipTests -Pno-extented-disk-storage`
 * Build with hadoop 2.x(the default is 3.x) dependencies: `mvn clean package 
-DskipTests -Phadoop2`
-* Specify Flink version for Flink optimizer(the default is 1.18.1): `mvn clean 
package -DskipTests -Dflink-optimizer.flink-version=1.15.4`
+* Specify Flink version for Flink optimizer(the default is 1.20.0): `mvn clean 
package -DskipTests -Dflink-optimizer.flink-version=1.20.0`
   * If the version of Flink is below 1.15.0, you also need to add the 
`-Pflink-optimizer-pre-1.15` parameter: `mvn clean package -DskipTests 
-Pflink-optimizer-pre-1.15 -Dflink-optimizer.flink-version=1.14.6`
 * Specify Spark version for Spark optimizer(the default is 3.3.3): `mvn clean 
package -DskipTests -Dspark-optimizer.spark-version=3.3.3`
 * Build `amoro-mixed-format-trino` module under JDK 17: `mvn clean package 
-DskipTests -Pformat-mixed-format-trino,build-mixed-format-trino -pl 
'amoro-mixed-format/amoro-mixed-format-trino' -am`.
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
index a0bb79b6a..593f3ae92 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
@@ -33,7 +33,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.RestoreMode;
+import org.apache.flink.configuration.YamlParserUtils;
+import org.apache.flink.core.execution.RestoreMode;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rest.FileUpload;
 import org.apache.flink.runtime.rest.RestClient;
@@ -60,10 +61,13 @@ import java.io.InputStreamReader;
 import java.io.UncheckedIOException;
 import java.net.URL;
 import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ExecutorService;
@@ -79,7 +83,9 @@ public class FlinkOptimizerContainer extends 
AbstractResourceContainer {
 
   public static final String FLINK_HOME_PROPERTY = "flink-home";
   public static final String FLINK_CONFIG_PATH = "/conf";
-  public static final String FLINK_CONFIG_YAML = "/flink-conf.yaml";
+  public static final String LEGACY_FLINK_CONFIG_YAML = "/flink-conf.yaml";
+  // flink version >= 1.20 use it first
+  public static final String FLINK_CONFIG_YAML = "/config.yaml";
   public static final String ENV_FLINK_CONF_DIR = "FLINK_CONF_DIR";
   public static final String FLINK_CLIENT_TIMEOUT_SECOND = 
"flink-client-timeout-second";
 
@@ -87,18 +93,6 @@ public class FlinkOptimizerContainer extends 
AbstractResourceContainer {
   private static final String FLINK_JOB_MAIN_CLASS =
       "org.apache.amoro.optimizer.flink.FlinkOptimizer";
 
-  /**
-   * This will be removed in 0.7.0, using flink properties
-   * `flink-conf.taskmanager.memory.process.size`.
-   */
-  @Deprecated public static final String TASK_MANAGER_MEMORY_PROPERTY = 
"taskmanager.memory";
-
-  /**
-   * This will be removed in 0.7.0, using flink properties
-   * `flink-conf.jobmanager.memory.process.size`.
-   */
-  @Deprecated public static final String JOB_MANAGER_MEMORY_PROPERTY = 
"jobmanager.memory";
-
   public static final String FLINK_RUN_TARGET = "target";
   public static final String FLINK_JOB_URI = "job-uri";
 
@@ -228,16 +222,10 @@ public class FlinkOptimizerContainer extends 
AbstractResourceContainer {
 
     long jobManagerMemory =
         getMemorySizeValue(
-            properties,
-            resourceFlinkConf,
-            JOB_MANAGER_MEMORY_PROPERTY,
-            FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY);
+            properties, resourceFlinkConf, 
FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY);
     long taskManagerMemory =
         getMemorySizeValue(
-            properties,
-            resourceFlinkConf,
-            TASK_MANAGER_MEMORY_PROPERTY,
-            FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY);
+            properties, resourceFlinkConf, 
FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY);
 
     resourceFlinkConf.putToOptions(
         FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY, jobManagerMemory + 
"m");
@@ -266,15 +254,62 @@ public class FlinkOptimizerContainer extends 
AbstractResourceContainer {
         jobArgs);
   }
 
+  @VisibleForTesting
+  protected Map<String, String> loadFlinkConfigForYAML(URL path) {
+    this.flinkConfDir = Paths.get(path.getPath()).getParent().toString();
+    return loadFlinkConfig();
+  }
+
+  /**
+   * get flink config with config.yaml or flink-conf.yaml see <a
+   * 
href="https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/config/#flink-configuration-file";></a>
+   *
+   * @return flink config map
+   */
   private Map<String, String> loadFlinkConfig() {
     try {
-      return new Yaml().load(Files.newInputStream(Paths.get(flinkConfDir + 
FLINK_CONFIG_YAML)));
-    } catch (IOException e) {
+      Path flinkConfPath = Paths.get(flinkConfDir + FLINK_CONFIG_YAML);
+      if (!Files.exists(flinkConfPath, LinkOption.NOFOLLOW_LINKS)) {
+        flinkConfPath = Paths.get(flinkConfDir + LEGACY_FLINK_CONFIG_YAML);
+        return new Yaml().load(Files.newInputStream(flinkConfPath));
+      }
+      Map<String, Object> configDocument =
+          YamlParserUtils.loadYamlFile(new File(flinkConfPath.toUri()));
+      return Maps.transformValues(
+          flatten(configDocument, ""), value -> value == null ? null : 
value.toString());
+    } catch (Exception e) {
       LOG.error("load flink conf yaml failed: {}", e.getMessage());
       return Collections.emptyMap();
     }
   }
 
+  /**
+   * Copy from flink 1.20 GlobalConfiguration.flatten Utils
+   *
+   * @param config
+   * @param keyPrefix
+   * @return
+   */
+  private static Map<String, Object> flatten(Map<String, Object> config, 
String keyPrefix) {
+    final Map<String, Object> flattenedMap = new HashMap<>();
+    config.forEach(
+        (key, value) -> {
+          String flattenedKey = keyPrefix + key;
+          if (value instanceof Map) {
+            Map<String, Object> e = (Map<String, Object>) value;
+            flattenedMap.putAll(flatten(e, flattenedKey + "."));
+          } else {
+            if (value instanceof List) {
+              flattenedMap.put(flattenedKey, 
YamlParserUtils.toYAMLString(value));
+            } else {
+              flattenedMap.put(flattenedKey, value);
+            }
+          }
+        });
+
+    return flattenedMap;
+  }
+
   private void addKubernetesProperties(Resource resource, FlinkConf flinkConf) 
{
     String clusterId = kubernetesClusterId(resource);
     flinkConf.putToOptions(FlinkConfKeys.KUBERNETES_CLUSTER_ID, clusterId);
@@ -297,16 +332,12 @@ public class FlinkOptimizerContainer extends 
AbstractResourceContainer {
 
   /**
    * get jobManager and taskManager memory. An example of using Jobmanager 
memory parameters is as
-   * follows: jobmanager.memory: 1024 
flink-conf.jobmanager.memory.process.size: 1024M
-   * flink-conf.yaml Prioritize from high to low.
+   * flink-conf.jobmanager.memory.process.size: 1024M flink-conf.yaml 
Prioritize from high to low.
    */
   @VisibleForTesting
   protected long getMemorySizeValue(
-      Map<String, String> resourceProperties,
-      FlinkConf conf,
-      String resourcePropertyKey,
-      String flinkConfKey) {
-    String value = resourceProperties.get(resourcePropertyKey);
+      Map<String, String> resourceProperties, FlinkConf conf, String 
flinkConfKey) {
+    String value = resourceProperties.get(flinkConfKey);
     if (value == null) {
       value = conf.configValue(flinkConfKey);
     }
@@ -532,7 +563,7 @@ public class FlinkOptimizerContainer extends 
AbstractResourceContainer {
     JobID jobID = JobID.generate();
     JarRunRequestBody runRequestBody =
         new JarRunRequestBody(
-            FLINK_JOB_MAIN_CLASS, args, null, null, jobID, true, null, 
RestoreMode.DEFAULT, null);
+            FLINK_JOB_MAIN_CLASS, args, null, null, jobID, true, null, 
RestoreMode.NO_CLAIM, null);
     LOG.info("Submitting job: {} to session cluster, args: {}", jobID, args);
     try (RestClusterClient<String> restClusterClient =
         FlinkClientUtil.getRestClusterClient(configuration)) {
diff --git 
a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
 
b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
index 28517cf58..7cc235eae 100644
--- 
a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
+++ 
b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
@@ -23,8 +23,11 @@ import 
org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.net.URL;
+import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 public class TestFlinkOptimizerContainer {
   FlinkOptimizerContainer container = new FlinkOptimizerContainer();
@@ -52,6 +55,28 @@ public class TestFlinkOptimizerContainer {
     Assert.assertEquals(0, container.parseMemorySize("100kb"));
   }
 
+  @Test
+  public void testReadFlinkConfigFile() {
+    ClassLoader classLoader = getClass().getClassLoader();
+    URL flinkConfResourceUrl = classLoader.getResource("flink-conf.yaml");
+    Assert.assertEquals(
+        
Paths.get(Objects.requireNonNull(flinkConfResourceUrl).getPath()).getFileName().toString(),
+        "flink-conf.yaml");
+    URL newFlinkConfResourceUrl = classLoader.getResource("config.yaml");
+    Assert.assertEquals(
+        Paths.get(Objects.requireNonNull(newFlinkConfResourceUrl).getPath())
+            .getFileName()
+            .toString(),
+        "config.yaml");
+    Map<String, String> flinkConfig = 
container.loadFlinkConfigForYAML(newFlinkConfResourceUrl);
+    Assert.assertEquals(
+        
flinkConfig.get(FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY),
+        "1728m");
+    Assert.assertEquals(
+        
flinkConfig.get(FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY),
+        "1600m");
+  }
+
   @Test
   public void testBuildFlinkOptions() {
     Map<String, String> containerProperties = 
Maps.newHashMap(this.containerProperties);
@@ -78,8 +103,8 @@ public class TestFlinkOptimizerContainer {
   @Test
   public void testGetMemorySizeValue() {
     HashMap<String, String> prop = new HashMap<>();
-    prop.put("taskmanager.memory", "100");
-    prop.put("jobmanager.memory", "100");
+    
prop.put(FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY,
 "100");
+    
prop.put(FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY,
 "100");
 
     FlinkOptimizerContainer.FlinkConf conf =
         FlinkOptimizerContainer.FlinkConf.buildFor(prop, 
Maps.newHashMap()).build();
@@ -87,11 +112,11 @@ public class TestFlinkOptimizerContainer {
     Assert.assertEquals(
         100L,
         container.getMemorySizeValue(
-            prop, conf, "taskmanager.memory", 
"taskmanager.memory.process.size"));
+            prop, conf, 
FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
     Assert.assertEquals(
         100L,
         container.getMemorySizeValue(
-            prop, conf, "jobmanager.memory", 
"jobmanager.memory.process.size"));
+            prop, conf, 
FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));
 
     Map<String, String> containerProperties = Maps.newHashMap();
     containerProperties.put("flink-conf.jobmanager.memory.process.size", "200 
M");
@@ -101,36 +126,36 @@ public class TestFlinkOptimizerContainer {
     Assert.assertEquals(
         200L,
         container.getMemorySizeValue(
-            prop, conf, "taskmanager.memory", 
"taskmanager.memory.process.size"));
+            prop, conf, 
FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
     Assert.assertEquals(
         200L,
         container.getMemorySizeValue(
-            prop, conf, "jobmanager.memory", 
"jobmanager.memory.process.size"));
+            prop, conf, 
FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));
 
     prop.clear();
     containerProperties = Maps.newHashMap();
     conf = FlinkOptimizerContainer.FlinkConf.buildFor(prop, 
containerProperties).build();
 
-    prop.put("taskmanager.memory", "300 M");
-    prop.put("jobmanager.memory", "300");
+    
prop.put(FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY,
 "300 M");
+    
prop.put(FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY,
 "300");
     Assert.assertEquals(
         300L,
         container.getMemorySizeValue(
-            prop, conf, "taskmanager.memory", 
"taskmanager.memory.process.size"));
+            prop, conf, 
FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
     Assert.assertEquals(
         300L,
         container.getMemorySizeValue(
-            prop, conf, "jobmanager.memory", 
"jobmanager.memory.process.size"));
+            prop, conf, 
FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));
 
     conf = FlinkOptimizerContainer.FlinkConf.buildFor(Maps.newHashMap(), 
Maps.newHashMap()).build();
     prop.clear();
     Assert.assertEquals(
         0L,
         container.getMemorySizeValue(
-            prop, conf, "taskmanager.memory", 
"taskmanager.memory.process.size"));
+            prop, conf, 
FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
     Assert.assertEquals(
         0L,
         container.getMemorySizeValue(
-            prop, conf, "jobmanager.memory", 
"jobmanager.memory.process.size"));
+            prop, conf, 
FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));
   }
 }
diff --git a/docker/optimizer-flink/Dockerfile 
b/amoro-ams/amoro-ams-server/src/test/resources/config.yaml
similarity index 52%
copy from docker/optimizer-flink/Dockerfile
copy to amoro-ams/amoro-ams-server/src/test/resources/config.yaml
index 46d50e1b4..168046fda 100644
--- a/docker/optimizer-flink/Dockerfile
+++ b/amoro-ams/amoro-ams-server/src/test/resources/config.yaml
@@ -1,3 +1,4 @@
+################################################################################
 #  Licensed to the Apache Software Foundation (ASF) under one
 #  or more contributor license agreements.  See the NOTICE file
 #  distributed with this work for additional information
@@ -12,22 +13,27 @@
 #  distributed under the License is distributed on an "AS IS" BASIS,
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #  See the License for the specific language governing permissions and
-# limitations under the License.
+#  limitations under the License.
+################################################################################
 
-ARG FLINK_VERSION=1.18.1-java8
+jobmanager:
+  bind-host: localhost
+  rpc:
+    address: localhost
+    port: 6123
+  memory:
+    process:
+      size: 1600m
+  execution:
+    failover-strategy: region
 
-FROM flink:${FLINK_VERSION}
-
-ARG MAVEN_MIRROR=https://repo.maven.apache.org/maven2
-ARG OPTIMIZER_JOB=optimizer-job.jar
-ARG AWS_VERSION=2.24.12
-
-RUN cd $FLINK_HOME/lib \
- && wget 
${MAVEN_MIRROR}/software/amazon/awssdk/bundle/${AWS_VERSION}/bundle-${AWS_VERSION}.jar
 \
- && wget 
${MAVEN_MIRROR}/software/amazon/awssdk/url-connection-client/${AWS_VERSION}/url-connection-client-${AWS_VERSION}.jar
 \
- && wget 
${MAVEN_MIRROR}/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
 \
- && chown flink:flink *.jar \
- && mkdir -p $FLINK_HOME/usrlib
-
-COPY $OPTIMIZER_JOB $FLINK_HOME/usrlib/optimizer-job.jar
+taskmanager:
+  bind-host: localhost
+  host: localhost
+  numberOfTaskSlots: 1
+  memory:
+    process:
+      size: 1728m
 
+parallelism:
+  default: 1
\ No newline at end of file
diff --git a/docker/optimizer-flink/Dockerfile 
b/amoro-ams/amoro-ams-server/src/test/resources/flink-conf.yaml
similarity index 52%
copy from docker/optimizer-flink/Dockerfile
copy to amoro-ams/amoro-ams-server/src/test/resources/flink-conf.yaml
index 46d50e1b4..7b668d57f 100644
--- a/docker/optimizer-flink/Dockerfile
+++ b/amoro-ams/amoro-ams-server/src/test/resources/flink-conf.yaml
@@ -1,3 +1,4 @@
+################################################################################
 #  Licensed to the Apache Software Foundation (ASF) under one
 #  or more contributor license agreements.  See the NOTICE file
 #  distributed with this work for additional information
@@ -12,22 +13,20 @@
 #  distributed under the License is distributed on an "AS IS" BASIS,
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #  See the License for the specific language governing permissions and
-# limitations under the License.
+#  limitations under the License.
+################################################################################
 
-ARG FLINK_VERSION=1.18.1-java8
+jobmanager.rpc.address: localhost
+jobmanager.rpc.port: 6123
+jobmanager.bind-host: localhost
+jobmanager.memory.process.size: 1600m
 
-FROM flink:${FLINK_VERSION}
-
-ARG MAVEN_MIRROR=https://repo.maven.apache.org/maven2
-ARG OPTIMIZER_JOB=optimizer-job.jar
-ARG AWS_VERSION=2.24.12
-
-RUN cd $FLINK_HOME/lib \
- && wget 
${MAVEN_MIRROR}/software/amazon/awssdk/bundle/${AWS_VERSION}/bundle-${AWS_VERSION}.jar
 \
- && wget 
${MAVEN_MIRROR}/software/amazon/awssdk/url-connection-client/${AWS_VERSION}/url-connection-client-${AWS_VERSION}.jar
 \
- && wget 
${MAVEN_MIRROR}/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
 \
- && chown flink:flink *.jar \
- && mkdir -p $FLINK_HOME/usrlib
-
-COPY $OPTIMIZER_JOB $FLINK_HOME/usrlib/optimizer-job.jar
+taskmanager.bind-host: localhost
+taskmanager.host: localhost
+taskmanager.memory.process.size: 1728m
+taskmanager.numberOfTaskSlots: 1
+parallelism.default: 2
 
+jobmanager.execution.failover-strategy: region
+rest.address: localhost
+rest.bind-address: localhost
\ No newline at end of file
diff --git a/docker/build.sh b/docker/build.sh
index b3e1f5f20..4f4871894 100755
--- a/docker/build.sh
+++ b/docker/build.sh
@@ -24,7 +24,7 @@ export PROJECT_HOME
 cd $CURRENT_DIR
 
 AMORO_VERSION=`cat $PROJECT_HOME/pom.xml | grep 'amoro-parent' -C 3 | grep -Eo 
'<version>.*</version>' | awk -F'[><]' '{print $3}'`
-FLINK_VERSION=1.15.3
+FLINK_VERSION=1.20.0
 SPARK_VERSION=3.3.3
 DEBIAN_MIRROR=http://deb.debian.org
 APACHE_ARCHIVE=https://archive.apache.org/dist
@@ -47,7 +47,7 @@ Images:
     amoro                   Build official Amoro image used for production 
environments.
 
 Options:
-    --flink-version         Flink binary release version, default is 1.15.3, 
format must be x.y.z
+    --flink-version         Flink binary release version, default is 1.20.0, 
format must be x.y.z
     --spark-version         Spark binary release version, default is 3.3.3, 
format must be x.y.z
     --apache-archive        Apache Archive url, default is 
https://archive.apache.org/dist
     --debian-mirror         Mirror url of debian, default is 
http://deb.debian.org
diff --git a/docker/optimizer-flink/Dockerfile 
b/docker/optimizer-flink/Dockerfile
index 46d50e1b4..c5131f08a 100644
--- a/docker/optimizer-flink/Dockerfile
+++ b/docker/optimizer-flink/Dockerfile
@@ -14,7 +14,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
-ARG FLINK_VERSION=1.18.1-java8
+ARG FLINK_VERSION=1.20.0-java8
 
 FROM flink:${FLINK_VERSION}
 
diff --git a/pom.xml b/pom.xml
index d1117f877..09dd2c077 100644
--- a/pom.xml
+++ b/pom.xml
@@ -131,7 +131,7 @@
         <lucene.version>8.11.2</lucene.version>
         <bitmap.version>1.0.1</bitmap.version>
         <prometheus.version>0.16.0</prometheus.version>
-        <flink.version>1.18.1</flink.version>
+        <flink.version>1.20.0</flink.version>
         
<fabric8-kubernetes-client.version.version>6.12.1</fabric8-kubernetes-client.version.version>
         <amoro-shade.version>0.7.0-incubating</amoro-shade.version>
         <amoro-shade-guava.version>32.1.1-jre</amoro-shade-guava.version>

Reply via email to