This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 4f510187a [AMORO-3087]: Optimizer Support Flink 1.20 (#3087) (#3090)
4f510187a is described below
commit 4f510187a950313ac70b3a6130f0bcbc9c98ca19
Author: ConradJam <[email protected]>
AuthorDate: Wed Aug 14 20:15:26 2024 +0800
[AMORO-3087]: Optimizer Support Flink 1.20 (#3087) (#3090)
* [AMORO-3087]: Optimizer Support Flink 1.20 (#3087)
* fix docs
* fix
* docker images change flink version to 1.20
---------
Co-authored-by: ConradJam <[email protected]>
---
.github/workflows/docker-images.yml | 6 +-
README.md | 2 +-
.../server/manager/FlinkOptimizerContainer.java | 95 ++++++++++++++--------
.../manager/TestFlinkOptimizerContainer.java | 49 ++++++++---
.../src/test/resources/config.yaml | 38 +++++----
.../src/test/resources/flink-conf.yaml | 31 ++++---
docker/build.sh | 4 +-
docker/optimizer-flink/Dockerfile | 2 +-
pom.xml | 2 +-
9 files changed, 145 insertions(+), 84 deletions(-)
diff --git a/.github/workflows/docker-images.yml
b/.github/workflows/docker-images.yml
index 9866df029..f2d1c16c1 100644
--- a/.github/workflows/docker-images.yml
+++ b/.github/workflows/docker-images.yml
@@ -103,7 +103,7 @@ jobs:
if: ${{ startsWith(github.repository, 'apache/') }}
strategy:
matrix:
- flink: [ "1.14.6", "1.18.1" ]
+ flink: [ "1.14.6", "1.20.0" ]
steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
@@ -129,11 +129,11 @@ jobs:
tags: |
type=ref,event=branch,enable=${{ matrix.flink == '1.14.6'
}},suffix=-snapshot
type=ref,event=branch,enable=${{ matrix.flink == '1.14.6'
}},suffix=-snapshot-flink1.14
- type=ref,event=branch,enable=${{ matrix.flink == '1.18.1'
}},suffix=-snapshot-flink1.18
+ type=ref,event=branch,enable=${{ matrix.flink == '1.20.0'
}},suffix=-snapshot-flink1.20
type=raw,enable=${{ matrix.hadoop == '1.14.6' &&
startsWith(github.ref, 'refs/tags/v') }},value=latest
type=semver,enable=${{ matrix.flink == '1.14.6'
}},pattern={{version}}
type=semver,enable=${{ matrix.flink == '1.14.6'
}},pattern={{version}}, suffix=-flink1.14
- type=semver,enable=${{ matrix.flink == '1.18.1'
}},pattern={{version}}, suffix=-flink1.18
+ type=semver,enable=${{ matrix.flink == '1.20.0'
}},pattern={{version}}, suffix=-flink1.20
- name: Print tags
run: echo '${{ steps.meta.outputs.tags }}'
diff --git a/README.md b/README.md
index cf13ff4a5..9f1f8276b 100644
--- a/README.md
+++ b/README.md
@@ -117,7 +117,7 @@ Amoro is built using Maven with JDK 8 and JDK 17(only for
`amoro-mixed-format/am
* Build and skip dashboard: `mvn clean package -Pskip-dashboard-build`
* Build and disable disk storage, RocksDB will NOT be introduced to avoid
memory overflow: `mvn clean package -DskipTests -Pno-extented-disk-storage`
* Build with hadoop 2.x(the default is 3.x) dependencies: `mvn clean package
-DskipTests -Phadoop2`
-* Specify Flink version for Flink optimizer(the default is 1.18.1): `mvn clean
package -DskipTests -Dflink-optimizer.flink-version=1.15.4`
+* Specify Flink version for Flink optimizer(the default is 1.20.0): `mvn clean
package -DskipTests -Dflink-optimizer.flink-version=1.20.0`
* If the version of Flink is below 1.15.0, you also need to add the
`-Pflink-optimizer-pre-1.15` parameter: `mvn clean package -DskipTests
-Pflink-optimizer-pre-1.15 -Dflink-optimizer.flink-version=1.14.6`
* Specify Spark version for Spark optimizer(the default is 3.3.3): `mvn clean
package -DskipTests -Dspark-optimizer.spark-version=3.3.3`
* Build `amoro-mixed-format-trino` module under JDK 17: `mvn clean package
-DskipTests -Pformat-mixed-format-trino,build-mixed-format-trino -pl
'amoro-mixed-format/amoro-mixed-format-trino' -am`.
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
index a0bb79b6a..593f3ae92 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
@@ -33,7 +33,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.RestoreMode;
+import org.apache.flink.configuration.YamlParserUtils;
+import org.apache.flink.core.execution.RestoreMode;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.RestClient;
@@ -60,10 +61,13 @@ import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.net.URL;
import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
@@ -79,7 +83,9 @@ public class FlinkOptimizerContainer extends
AbstractResourceContainer {
public static final String FLINK_HOME_PROPERTY = "flink-home";
public static final String FLINK_CONFIG_PATH = "/conf";
- public static final String FLINK_CONFIG_YAML = "/flink-conf.yaml";
+ public static final String LEGACY_FLINK_CONFIG_YAML = "/flink-conf.yaml";
+ // flink version >= 1.20 use it first
+ public static final String FLINK_CONFIG_YAML = "/config.yaml";
public static final String ENV_FLINK_CONF_DIR = "FLINK_CONF_DIR";
public static final String FLINK_CLIENT_TIMEOUT_SECOND =
"flink-client-timeout-second";
@@ -87,18 +93,6 @@ public class FlinkOptimizerContainer extends
AbstractResourceContainer {
private static final String FLINK_JOB_MAIN_CLASS =
"org.apache.amoro.optimizer.flink.FlinkOptimizer";
- /**
- * This will be removed in 0.7.0, using flink properties
- * `flink-conf.taskmanager.memory.process.size`.
- */
- @Deprecated public static final String TASK_MANAGER_MEMORY_PROPERTY =
"taskmanager.memory";
-
- /**
- * This will be removed in 0.7.0, using flink properties
- * `flink-conf.jobmanager.memory.process.size`.
- */
- @Deprecated public static final String JOB_MANAGER_MEMORY_PROPERTY =
"jobmanager.memory";
-
public static final String FLINK_RUN_TARGET = "target";
public static final String FLINK_JOB_URI = "job-uri";
@@ -228,16 +222,10 @@ public class FlinkOptimizerContainer extends
AbstractResourceContainer {
long jobManagerMemory =
getMemorySizeValue(
- properties,
- resourceFlinkConf,
- JOB_MANAGER_MEMORY_PROPERTY,
- FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY);
+ properties, resourceFlinkConf,
FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY);
long taskManagerMemory =
getMemorySizeValue(
- properties,
- resourceFlinkConf,
- TASK_MANAGER_MEMORY_PROPERTY,
- FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY);
+ properties, resourceFlinkConf,
FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY);
resourceFlinkConf.putToOptions(
FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY, jobManagerMemory +
"m");
@@ -266,15 +254,62 @@ public class FlinkOptimizerContainer extends
AbstractResourceContainer {
jobArgs);
}
+ @VisibleForTesting
+ protected Map<String, String> loadFlinkConfigForYAML(URL path) {
+ this.flinkConfDir = Paths.get(path.getPath()).getParent().toString();
+ return loadFlinkConfig();
+ }
+
+ /**
+ * get flink config with config.yaml or flink-conf.yaml see <a
+ *
href="https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/config/#flink-configuration-file"></a>
+ *
+ * @return flink config map
+ */
private Map<String, String> loadFlinkConfig() {
try {
- return new Yaml().load(Files.newInputStream(Paths.get(flinkConfDir +
FLINK_CONFIG_YAML)));
- } catch (IOException e) {
+ Path flinkConfPath = Paths.get(flinkConfDir + FLINK_CONFIG_YAML);
+ if (!Files.exists(flinkConfPath, LinkOption.NOFOLLOW_LINKS)) {
+ flinkConfPath = Paths.get(flinkConfDir + LEGACY_FLINK_CONFIG_YAML);
+ return new Yaml().load(Files.newInputStream(flinkConfPath));
+ }
+ Map<String, Object> configDocument =
+ YamlParserUtils.loadYamlFile(new File(flinkConfPath.toUri()));
+ return Maps.transformValues(
+ flatten(configDocument, ""), value -> value == null ? null :
value.toString());
+ } catch (Exception e) {
LOG.error("load flink conf yaml failed: {}", e.getMessage());
return Collections.emptyMap();
}
}
+ /**
+ * Copy from flink 1.20 GlobalConfiguration.flatten Utils
+ *
+ * @param config
+ * @param keyPrefix
+ * @return
+ */
+ private static Map<String, Object> flatten(Map<String, Object> config,
String keyPrefix) {
+ final Map<String, Object> flattenedMap = new HashMap<>();
+ config.forEach(
+ (key, value) -> {
+ String flattenedKey = keyPrefix + key;
+ if (value instanceof Map) {
+ Map<String, Object> e = (Map<String, Object>) value;
+ flattenedMap.putAll(flatten(e, flattenedKey + "."));
+ } else {
+ if (value instanceof List) {
+ flattenedMap.put(flattenedKey,
YamlParserUtils.toYAMLString(value));
+ } else {
+ flattenedMap.put(flattenedKey, value);
+ }
+ }
+ });
+
+ return flattenedMap;
+ }
+
private void addKubernetesProperties(Resource resource, FlinkConf flinkConf)
{
String clusterId = kubernetesClusterId(resource);
flinkConf.putToOptions(FlinkConfKeys.KUBERNETES_CLUSTER_ID, clusterId);
@@ -297,16 +332,12 @@ public class FlinkOptimizerContainer extends
AbstractResourceContainer {
/**
* get jobManager and taskManager memory. An example of using Jobmanager
memory parameters is as
- * follows: jobmanager.memory: 1024
flink-conf.jobmanager.memory.process.size: 1024M
- * flink-conf.yaml Prioritize from high to low.
+ * flink-conf.jobmanager.memory.process.size: 1024M flink-conf.yaml
Prioritize from high to low.
*/
@VisibleForTesting
protected long getMemorySizeValue(
- Map<String, String> resourceProperties,
- FlinkConf conf,
- String resourcePropertyKey,
- String flinkConfKey) {
- String value = resourceProperties.get(resourcePropertyKey);
+ Map<String, String> resourceProperties, FlinkConf conf, String
flinkConfKey) {
+ String value = resourceProperties.get(flinkConfKey);
if (value == null) {
value = conf.configValue(flinkConfKey);
}
@@ -532,7 +563,7 @@ public class FlinkOptimizerContainer extends
AbstractResourceContainer {
JobID jobID = JobID.generate();
JarRunRequestBody runRequestBody =
new JarRunRequestBody(
- FLINK_JOB_MAIN_CLASS, args, null, null, jobID, true, null,
RestoreMode.DEFAULT, null);
+ FLINK_JOB_MAIN_CLASS, args, null, null, jobID, true, null,
RestoreMode.NO_CLAIM, null);
LOG.info("Submitting job: {} to session cluster, args: {}", jobID, args);
try (RestClusterClient<String> restClusterClient =
FlinkClientUtil.getRestClusterClient(configuration)) {
diff --git
a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
index 28517cf58..7cc235eae 100644
---
a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
+++
b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
@@ -23,8 +23,11 @@ import
org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Test;
+import java.net.URL;
+import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
public class TestFlinkOptimizerContainer {
FlinkOptimizerContainer container = new FlinkOptimizerContainer();
@@ -52,6 +55,28 @@ public class TestFlinkOptimizerContainer {
Assert.assertEquals(0, container.parseMemorySize("100kb"));
}
+ @Test
+ public void testReadFlinkConfigFile() {
+ ClassLoader classLoader = getClass().getClassLoader();
+ URL flinkConfResourceUrl = classLoader.getResource("flink-conf.yaml");
+ Assert.assertEquals(
+
Paths.get(Objects.requireNonNull(flinkConfResourceUrl).getPath()).getFileName().toString(),
+ "flink-conf.yaml");
+ URL newFlinkConfResourceUrl = classLoader.getResource("config.yaml");
+ Assert.assertEquals(
+ Paths.get(Objects.requireNonNull(newFlinkConfResourceUrl).getPath())
+ .getFileName()
+ .toString(),
+ "config.yaml");
+ Map<String, String> flinkConfig =
container.loadFlinkConfigForYAML(newFlinkConfResourceUrl);
+ Assert.assertEquals(
+
flinkConfig.get(FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY),
+ "1728m");
+ Assert.assertEquals(
+
flinkConfig.get(FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY),
+ "1600m");
+ }
+
@Test
public void testBuildFlinkOptions() {
Map<String, String> containerProperties =
Maps.newHashMap(this.containerProperties);
@@ -78,8 +103,8 @@ public class TestFlinkOptimizerContainer {
@Test
public void testGetMemorySizeValue() {
HashMap<String, String> prop = new HashMap<>();
- prop.put("taskmanager.memory", "100");
- prop.put("jobmanager.memory", "100");
+
prop.put(FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY,
"100");
+
prop.put(FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY,
"100");
FlinkOptimizerContainer.FlinkConf conf =
FlinkOptimizerContainer.FlinkConf.buildFor(prop,
Maps.newHashMap()).build();
@@ -87,11 +112,11 @@ public class TestFlinkOptimizerContainer {
Assert.assertEquals(
100L,
container.getMemorySizeValue(
- prop, conf, "taskmanager.memory",
"taskmanager.memory.process.size"));
+ prop, conf,
FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
Assert.assertEquals(
100L,
container.getMemorySizeValue(
- prop, conf, "jobmanager.memory",
"jobmanager.memory.process.size"));
+ prop, conf,
FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));
Map<String, String> containerProperties = Maps.newHashMap();
containerProperties.put("flink-conf.jobmanager.memory.process.size", "200
M");
@@ -101,36 +126,36 @@ public class TestFlinkOptimizerContainer {
Assert.assertEquals(
200L,
container.getMemorySizeValue(
- prop, conf, "taskmanager.memory",
"taskmanager.memory.process.size"));
+ prop, conf,
FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
Assert.assertEquals(
200L,
container.getMemorySizeValue(
- prop, conf, "jobmanager.memory",
"jobmanager.memory.process.size"));
+ prop, conf,
FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));
prop.clear();
containerProperties = Maps.newHashMap();
conf = FlinkOptimizerContainer.FlinkConf.buildFor(prop,
containerProperties).build();
- prop.put("taskmanager.memory", "300 M");
- prop.put("jobmanager.memory", "300");
+
prop.put(FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY,
"300 M");
+
prop.put(FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY,
"300");
Assert.assertEquals(
300L,
container.getMemorySizeValue(
- prop, conf, "taskmanager.memory",
"taskmanager.memory.process.size"));
+ prop, conf,
FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
Assert.assertEquals(
300L,
container.getMemorySizeValue(
- prop, conf, "jobmanager.memory",
"jobmanager.memory.process.size"));
+ prop, conf,
FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));
conf = FlinkOptimizerContainer.FlinkConf.buildFor(Maps.newHashMap(),
Maps.newHashMap()).build();
prop.clear();
Assert.assertEquals(
0L,
container.getMemorySizeValue(
- prop, conf, "taskmanager.memory",
"taskmanager.memory.process.size"));
+ prop, conf,
FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
Assert.assertEquals(
0L,
container.getMemorySizeValue(
- prop, conf, "jobmanager.memory",
"jobmanager.memory.process.size"));
+ prop, conf,
FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));
}
}
diff --git a/docker/optimizer-flink/Dockerfile
b/amoro-ams/amoro-ams-server/src/test/resources/config.yaml
similarity index 52%
copy from docker/optimizer-flink/Dockerfile
copy to amoro-ams/amoro-ams-server/src/test/resources/config.yaml
index 46d50e1b4..168046fda 100644
--- a/docker/optimizer-flink/Dockerfile
+++ b/amoro-ams/amoro-ams-server/src/test/resources/config.yaml
@@ -1,3 +1,4 @@
+################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -12,22 +13,27 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
-# limitations under the License.
+# limitations under the License.
+################################################################################
-ARG FLINK_VERSION=1.18.1-java8
+jobmanager:
+ bind-host: localhost
+ rpc:
+ address: localhost
+ port: 6123
+ memory:
+ process:
+ size: 1600m
+ execution:
+ failover-strategy: region
-FROM flink:${FLINK_VERSION}
-
-ARG MAVEN_MIRROR=https://repo.maven.apache.org/maven2
-ARG OPTIMIZER_JOB=optimizer-job.jar
-ARG AWS_VERSION=2.24.12
-
-RUN cd $FLINK_HOME/lib \
- && wget
${MAVEN_MIRROR}/software/amazon/awssdk/bundle/${AWS_VERSION}/bundle-${AWS_VERSION}.jar
\
- && wget
${MAVEN_MIRROR}/software/amazon/awssdk/url-connection-client/${AWS_VERSION}/url-connection-client-${AWS_VERSION}.jar
\
- && wget
${MAVEN_MIRROR}/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
\
- && chown flink:flink *.jar \
- && mkdir -p $FLINK_HOME/usrlib
-
-COPY $OPTIMIZER_JOB $FLINK_HOME/usrlib/optimizer-job.jar
+taskmanager:
+ bind-host: localhost
+ host: localhost
+ numberOfTaskSlots: 1
+ memory:
+ process:
+ size: 1728m
+parallelism:
+ default: 1
\ No newline at end of file
diff --git a/docker/optimizer-flink/Dockerfile
b/amoro-ams/amoro-ams-server/src/test/resources/flink-conf.yaml
similarity index 52%
copy from docker/optimizer-flink/Dockerfile
copy to amoro-ams/amoro-ams-server/src/test/resources/flink-conf.yaml
index 46d50e1b4..7b668d57f 100644
--- a/docker/optimizer-flink/Dockerfile
+++ b/amoro-ams/amoro-ams-server/src/test/resources/flink-conf.yaml
@@ -1,3 +1,4 @@
+################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -12,22 +13,20 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
-# limitations under the License.
+# limitations under the License.
+################################################################################
-ARG FLINK_VERSION=1.18.1-java8
+jobmanager.rpc.address: localhost
+jobmanager.rpc.port: 6123
+jobmanager.bind-host: localhost
+jobmanager.memory.process.size: 1600m
-FROM flink:${FLINK_VERSION}
-
-ARG MAVEN_MIRROR=https://repo.maven.apache.org/maven2
-ARG OPTIMIZER_JOB=optimizer-job.jar
-ARG AWS_VERSION=2.24.12
-
-RUN cd $FLINK_HOME/lib \
- && wget
${MAVEN_MIRROR}/software/amazon/awssdk/bundle/${AWS_VERSION}/bundle-${AWS_VERSION}.jar
\
- && wget
${MAVEN_MIRROR}/software/amazon/awssdk/url-connection-client/${AWS_VERSION}/url-connection-client-${AWS_VERSION}.jar
\
- && wget
${MAVEN_MIRROR}/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
\
- && chown flink:flink *.jar \
- && mkdir -p $FLINK_HOME/usrlib
-
-COPY $OPTIMIZER_JOB $FLINK_HOME/usrlib/optimizer-job.jar
+taskmanager.bind-host: localhost
+taskmanager.host: localhost
+taskmanager.memory.process.size: 1728m
+taskmanager.numberOfTaskSlots: 1
+parallelism.default: 2
+jobmanager.execution.failover-strategy: region
+rest.address: localhost
+rest.bind-address: localhost
\ No newline at end of file
diff --git a/docker/build.sh b/docker/build.sh
index b3e1f5f20..4f4871894 100755
--- a/docker/build.sh
+++ b/docker/build.sh
@@ -24,7 +24,7 @@ export PROJECT_HOME
cd $CURRENT_DIR
AMORO_VERSION=`cat $PROJECT_HOME/pom.xml | grep 'amoro-parent' -C 3 | grep -Eo
'<version>.*</version>' | awk -F'[><]' '{print $3}'`
-FLINK_VERSION=1.15.3
+FLINK_VERSION=1.20.0
SPARK_VERSION=3.3.3
DEBIAN_MIRROR=http://deb.debian.org
APACHE_ARCHIVE=https://archive.apache.org/dist
@@ -47,7 +47,7 @@ Images:
amoro Build official Amoro image used for production
environments.
Options:
- --flink-version Flink binary release version, default is 1.15.3,
format must be x.y.z
+ --flink-version Flink binary release version, default is 1.20.0,
format must be x.y.z
--spark-version Spark binary release version, default is 3.3.3,
format must be x.y.z
--apache-archive Apache Archive url, default is
https://archive.apache.org/dist
--debian-mirror Mirror url of debian, default is
http://deb.debian.org
diff --git a/docker/optimizer-flink/Dockerfile
b/docker/optimizer-flink/Dockerfile
index 46d50e1b4..c5131f08a 100644
--- a/docker/optimizer-flink/Dockerfile
+++ b/docker/optimizer-flink/Dockerfile
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-ARG FLINK_VERSION=1.18.1-java8
+ARG FLINK_VERSION=1.20.0-java8
FROM flink:${FLINK_VERSION}
diff --git a/pom.xml b/pom.xml
index d1117f877..09dd2c077 100644
--- a/pom.xml
+++ b/pom.xml
@@ -131,7 +131,7 @@
<lucene.version>8.11.2</lucene.version>
<bitmap.version>1.0.1</bitmap.version>
<prometheus.version>0.16.0</prometheus.version>
- <flink.version>1.18.1</flink.version>
+ <flink.version>1.20.0</flink.version>
<fabric8-kubernetes-client.version.version>6.12.1</fabric8-kubernetes-client.version.version>
<amoro-shade.version>0.7.0-incubating</amoro-shade.version>
<amoro-shade-guava.version>32.1.1-jre</amoro-shade-guava.version>