This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2f75792d044 Flink 1.20 support (#36893)
2f75792d044 is described below
commit 2f75792d044de5355bf7298feb7bbc9c689c7ffd
Author: Yi Hu <[email protected]>
AuthorDate: Tue Nov 25 18:59:50 2025 -0500
Flink 1.20 support (#36893)
---
.../test-properties.json | 6 ++---
.../workflows/beam_LoadTests_Java_GBK_Smoke.yml | 2 +-
.../beam_PostCommit_Java_Examples_Flink.yml | 2 +-
.../beam_PostCommit_Java_Nexmark_Flink.yml | 2 +-
.../beam_PostCommit_Java_PVR_Flink_Streaming.yml | 2 +-
.../workflows/beam_PostCommit_Java_Tpcds_Flink.yml | 2 +-
.../beam_PostCommit_Java_ValidatesRunner_Flink.yml | 2 +-
...PostCommit_Java_ValidatesRunner_Flink_Java8.yml | 2 +-
.github/workflows/beam_PostCommit_XVR_Flink.yml | 2 +-
.../beam_PreCommit_Java_PVR_Flink_Batch.yml | 2 +-
.../beam_PreCommit_Java_PVR_Flink_Docker.yml | 2 +-
.../run_rc_validation_java_quickstart.yml | 2 +-
CHANGES.md | 1 +
gradle.properties | 2 +-
.../wrappers/streaming/DoFnOperator.java | 0
runners/flink/1.20/build.gradle | 25 +++++++++++++++++
.../flink/1.20/job-server-container/build.gradle | 26 ++++++++++++++++++
runners/flink/1.20/job-server/build.gradle | 31 ++++++++++++++++++++++
.../wrappers/streaming/DoFnOperator.java | 8 +++---
runners/flink/flink_runner.gradle | 5 ++++
.../translation/types/CoderTypeSerializer.java | 0
.../flink/FlinkExecutionEnvironmentsTest.java | 13 +++++++--
.../runners/flink/ReadSourceStreamingTest.java | 7 ++++-
.../flink/streaming/GroupByNullKeyTest.java | 7 ++++-
.../flink/streaming/TopWikipediaSessionsTest.java | 7 ++++-
sdks/go/examples/wasm/README.md | 2 +-
.../python/apache_beam/options/pipeline_options.py | 2 +-
sdks/typescript/src/apache_beam/runners/flink.ts | 2 +-
settings.gradle.kts | 18 +++++--------
29 files changed, 144 insertions(+), 40 deletions(-)
diff --git a/.github/actions/setup-default-test-properties/test-properties.json
b/.github/actions/setup-default-test-properties/test-properties.json
index 77d8d0d311f..91e264f483a 100644
--- a/.github/actions/setup-default-test-properties/test-properties.json
+++ b/.github/actions/setup-default-test-properties/test-properties.json
@@ -13,9 +13,9 @@
"TOX_ENV": ["Cloud", "Cython"]
},
"JavaTestProperties": {
- "SUPPORTED_VERSIONS": ["8", "11", "17", "21"],
- "FLINK_VERSIONS": ["1.17", "1.18", "1.19"],
- "SPARK_VERSIONS": ["2", "3"]
+ "SUPPORTED_VERSIONS": ["8", "11", "17", "21", "25"],
+ "FLINK_VERSIONS": ["1.17", "1.18", "1.19", "1.20"],
+ "SPARK_VERSIONS": ["3"]
},
"GoTestProperties": {
"SUPPORTED_VERSIONS": ["1.25"]
diff --git a/.github/workflows/beam_LoadTests_Java_GBK_Smoke.yml
b/.github/workflows/beam_LoadTests_Java_GBK_Smoke.yml
index 11ddb3f42f4..8c291efc1cd 100644
--- a/.github/workflows/beam_LoadTests_Java_GBK_Smoke.yml
+++ b/.github/workflows/beam_LoadTests_Java_GBK_Smoke.yml
@@ -106,7 +106,7 @@ jobs:
arguments: |
--info \
-PloadTest.mainClass=org.apache.beam.sdk.loadtests.GroupByKeyLoadTest \
- -Prunner=:runners:flink:1.19 \
+ -Prunner=:runners:flink:1.20 \
'-PloadTest.args=${{
env.beam_LoadTests_Java_GBK_Smoke_test_arguments_3 }}' \
- name: run GroupByKey load test Spark
uses: ./.github/actions/gradle-command-self-hosted-action
diff --git a/.github/workflows/beam_PostCommit_Java_Examples_Flink.yml
b/.github/workflows/beam_PostCommit_Java_Examples_Flink.yml
index ec2b4db31dd..f1f51b32742 100644
--- a/.github/workflows/beam_PostCommit_Java_Examples_Flink.yml
+++ b/.github/workflows/beam_PostCommit_Java_Examples_Flink.yml
@@ -80,7 +80,7 @@ jobs:
- name: run examplesIntegrationTest script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
- gradle-command: :runners:flink:1.19:examplesIntegrationTest
+ gradle-command: :runners:flink:1.20:examplesIntegrationTest
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v4
if: ${{ !success() }}
diff --git a/.github/workflows/beam_PostCommit_Java_Nexmark_Flink.yml
b/.github/workflows/beam_PostCommit_Java_Nexmark_Flink.yml
index 2d026e3536a..389db7eb2fa 100644
--- a/.github/workflows/beam_PostCommit_Java_Nexmark_Flink.yml
+++ b/.github/workflows/beam_PostCommit_Java_Nexmark_Flink.yml
@@ -102,7 +102,7 @@ jobs:
with:
gradle-command: :sdks:java:testing:nexmark:run
arguments: |
- -Pnexmark.runner=:runners:flink:1.19 \
+ -Pnexmark.runner=:runners:flink:1.20 \
"${{ env.GRADLE_COMMAND_ARGUMENTS }} --streaming=${{
matrix.streaming }} --queryLanguage=${{ matrix.queryLanguage }}" \
- name: run PostCommit Java Nexmark Flink (${{ matrix.streaming }})
script
if: matrix.queryLanguage == 'none'
diff --git a/.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml
b/.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml
index a773d2c58ac..3d40c300db0 100644
--- a/.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml
+++ b/.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml
@@ -77,7 +77,7 @@ jobs:
- name: run PostCommit Java Flink PortableValidatesRunner Streaming
script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
- gradle-command:
runners:flink:1.19:job-server:validatesPortableRunnerStreaming
+ gradle-command:
runners:flink:1.20:job-server:validatesPortableRunnerStreaming
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v4
if: ${{ !success() }}
diff --git a/.github/workflows/beam_PostCommit_Java_Tpcds_Flink.yml
b/.github/workflows/beam_PostCommit_Java_Tpcds_Flink.yml
index 78a9351a415..df29e476474 100644
--- a/.github/workflows/beam_PostCommit_Java_Tpcds_Flink.yml
+++ b/.github/workflows/beam_PostCommit_Java_Tpcds_Flink.yml
@@ -101,5 +101,5 @@ jobs:
with:
gradle-command: :sdks:java:testing:tpcds:run
arguments: |
- -Ptpcds.runner=:runners:flink:1.19 \
+ -Ptpcds.runner=:runners:flink:1.20 \
"-Ptpcds.args=${{env.tpcdsBigQueryArgs}}
${{env.tpcdsInfluxDBArgs}} ${{ env.GRADLE_COMMAND_ARGUMENTS }}
--queries=${{env.tpcdsQueriesArg}}" \
diff --git a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml
b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml
index 82e23e203b0..5d6a26301a8 100644
--- a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml
+++ b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml
@@ -78,7 +78,7 @@ jobs:
- name: run validatesRunner script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
- gradle-command: :runners:flink:1.19:validatesRunner
+ gradle-command: :runners:flink:1.20:validatesRunner
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v4
if: ${{ !success() }}
diff --git
a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink_Java8.yml
b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink_Java8.yml
index 9b061028cbc..5103926e391 100644
--- a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink_Java8.yml
+++ b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink_Java8.yml
@@ -81,7 +81,7 @@ jobs:
- name: run validatesRunner Java8 script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
- gradle-command: :runners:flink:1.19:validatesRunner
+ gradle-command: :runners:flink:1.20:validatesRunner
arguments: |
-PtestJavaVersion=8 \
-Pjava8Home=$JAVA_HOME_8_X64 \
diff --git a/.github/workflows/beam_PostCommit_XVR_Flink.yml
b/.github/workflows/beam_PostCommit_XVR_Flink.yml
index 53d1fd81546..8d0893eb2d7 100644
--- a/.github/workflows/beam_PostCommit_XVR_Flink.yml
+++ b/.github/workflows/beam_PostCommit_XVR_Flink.yml
@@ -47,7 +47,7 @@ env:
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
- FlinkVersion: 1.19
+ FlinkVersion: 1.20
jobs:
beam_PostCommit_XVR_Flink:
diff --git a/.github/workflows/beam_PreCommit_Java_PVR_Flink_Batch.yml
b/.github/workflows/beam_PreCommit_Java_PVR_Flink_Batch.yml
index a4ab0587b8f..9c93c3dc1ac 100644
--- a/.github/workflows/beam_PreCommit_Java_PVR_Flink_Batch.yml
+++ b/.github/workflows/beam_PreCommit_Java_PVR_Flink_Batch.yml
@@ -94,7 +94,7 @@ jobs:
- name: run validatesPortableRunnerBatch script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
- gradle-command:
:runners:flink:1.19:job-server:validatesPortableRunnerBatch
+ gradle-command:
:runners:flink:1.20:job-server:validatesPortableRunnerBatch
env:
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH }}
- name: Archive JUnit Test Results
diff --git a/.github/workflows/beam_PreCommit_Java_PVR_Flink_Docker.yml
b/.github/workflows/beam_PreCommit_Java_PVR_Flink_Docker.yml
index fce2e590d3e..fa4638c751a 100644
--- a/.github/workflows/beam_PreCommit_Java_PVR_Flink_Docker.yml
+++ b/.github/workflows/beam_PreCommit_Java_PVR_Flink_Docker.yml
@@ -99,7 +99,7 @@ jobs:
- name: run PreCommit Java PVR Flink Docker script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
- gradle-command:
:runners:flink:1.19:job-server:validatesPortableRunnerDocker
+ gradle-command:
:runners:flink:1.20:job-server:validatesPortableRunnerDocker
env:
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}}
- name: Archive JUnit Test Results
diff --git a/.github/workflows/run_rc_validation_java_quickstart.yml
b/.github/workflows/run_rc_validation_java_quickstart.yml
index 023839d5a3d..f39e8ac9392 100644
--- a/.github/workflows/run_rc_validation_java_quickstart.yml
+++ b/.github/workflows/run_rc_validation_java_quickstart.yml
@@ -88,7 +88,7 @@ jobs:
- name: Run QuickStart Java Flink Runner
uses: ./.github/actions/gradle-command-self-hosted-action
with:
- gradle-command: :runners:flink:1.19:runQuickstartJavaFlinkLocal
+ gradle-command: :runners:flink:1.20:runQuickstartJavaFlinkLocal
arguments: |
-Prepourl=${{ env.APACHE_REPO_URL }} \
-Pver=${{ env.RELEASE_VERSION }}
diff --git a/CHANGES.md b/CHANGES.md
index e6f9cf13ff9..68af5a342d7 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -65,6 +65,7 @@
* New highly anticipated feature X added to Python SDK
([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK
([#Y](https://github.com/apache/beam/issues/Y)).
+* Flink 1.20 support added
([#32647](https://github.com/apache/beam/issues/32647)).
## I/Os
diff --git a/gradle.properties b/gradle.properties
index 510122c4e7b..311b7800706 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -39,6 +39,6 @@ docker_image_default_repo_root=apache
docker_image_default_repo_prefix=beam_
# supported flink versions
-flink_versions=1.17,1.18,1.19
+flink_versions=1.17,1.18,1.19,1.20
# supported python versions
python_versions=3.10,3.11,3.12,3.13
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/1.17/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
similarity index 100%
copy from
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
copy to
runners/flink/1.17/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
diff --git a/runners/flink/1.20/build.gradle b/runners/flink/1.20/build.gradle
new file mode 100644
index 00000000000..4c148321ed4
--- /dev/null
+++ b/runners/flink/1.20/build.gradle
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+project.ext {
+ flink_major = '1.20'
+ flink_version = '1.20.3'
+}
+
+// Load the main build script which contains all build logic.
+apply from: "../flink_runner.gradle"
diff --git a/runners/flink/1.20/job-server-container/build.gradle
b/runners/flink/1.20/job-server-container/build.gradle
new file mode 100644
index 00000000000..afdb68a0fc9
--- /dev/null
+++ b/runners/flink/1.20/job-server-container/build.gradle
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '../../job-server-container'
+
+project.ext {
+ resource_path = basePath
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_job_server_container.gradle"
diff --git a/runners/flink/1.20/job-server/build.gradle
b/runners/flink/1.20/job-server/build.gradle
new file mode 100644
index 00000000000..e5fdd1febf9
--- /dev/null
+++ b/runners/flink/1.20/job-server/build.gradle
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '../../job-server'
+
+project.ext {
+ // Look for the source code in the parent module
+ main_source_dirs = ["$basePath/src/main/java"]
+ test_source_dirs = ["$basePath/src/test/java"]
+ main_resources_dirs = ["$basePath/src/main/resources"]
+ test_resources_dirs = ["$basePath/src/test/resources"]
+ archives_base_name = 'beam-runners-flink-1.20-job-server'
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_job_server.gradle"
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/1.20/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
similarity index 99%
rename from
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
rename to
runners/flink/1.20/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 13414682f8e..43668e0298e 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++
b/runners/flink/1.20/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -112,7 +112,6 @@ import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
@@ -233,7 +232,8 @@ public class DoFnOperator<PreInputT, InputT, OutputT>
private static final int MAX_NUMBER_PENDING_BUNDLE_FINALIZATIONS = 32;
protected transient InternalTimerService<TimerData> timerService;
- private transient InternalTimeServiceManager<?> timeServiceManager;
+ // Flink 1.20 moved timeServiceManager to protected scope. No longer need
delegate
+ // private transient InternalTimeServiceManager<?> timeServiceManager;
private transient PushedBackElementsHandler<WindowedValue<InputT>>
pushedBackElementsHandler;
@@ -492,9 +492,7 @@ public class DoFnOperator<PreInputT, InputT, OutputT>
}
timerInternals = new FlinkTimerInternals(timerService);
- timeServiceManager =
- getTimeServiceManager()
- .orElseThrow(() -> new IllegalStateException("Time service
manager is not set."));
+ Preconditions.checkNotNull(getTimeServiceManager(), "Time service
manager is not set.");
}
outputManager =
diff --git a/runners/flink/flink_runner.gradle
b/runners/flink/flink_runner.gradle
index 2bd3ad7b8db..52f9631f455 100644
--- a/runners/flink/flink_runner.gradle
+++ b/runners/flink/flink_runner.gradle
@@ -175,6 +175,11 @@ dependencies {
implementation library.java.joda_time
implementation library.java.args4j
+ // flink-core-api is introduced in Flink 1.20+
+ if (flink_major == '1.20' || flink_major.startsWith('2')) {
+ implementation "org.apache.flink:flink-core-api:$flink_version"
+ }
+
implementation "org.apache.flink:flink-clients:$flink_version"
// Runtime dependencies are not included in Beam's generated pom.xml, so we
must declare flink-clients in implementation
// configuration (https://issues.apache.org/jira/browse/BEAM-11732).
diff --git
a/runners/flink/1.17/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
similarity index 100%
rename from
runners/flink/1.17/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
rename to
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
index ec44d279586..7262760a632 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
@@ -563,7 +563,16 @@ public class FlinkExecutionEnvironmentsTest {
}
private String getSavepointPath(Object env) {
- return ((Configuration) Whitebox.getInternalState(env, "configuration"))
- .getString("execution.savepoint.path", null);
+ // pre Flink 1.20 config
+ String path =
+ ((Configuration) Whitebox.getInternalState(env, "configuration"))
+ .getString("execution.savepoint.path", null);
+ if (path == null) {
+ // Flink 1.20+
+ path =
+ ((Configuration) Whitebox.getInternalState(env, "configuration"))
+ .getString("execution.state-recovery.path", null);
+ }
+ return path;
}
}
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
index b8dc52f6cd4..d76a1bb2a27 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
@@ -28,7 +28,9 @@ import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.junit.After;
import org.junit.Before;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
/** Reads from a bounded source in streaming. */
public class ReadSourceStreamingTest extends AbstractTestBase {
@@ -40,12 +42,15 @@ public class ReadSourceStreamingTest extends
AbstractTestBase {
private static final String[] EXPECTED_RESULT =
new String[] {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
+ @ClassRule public static final TemporaryFolder TEMP_RESULT_FOLDER = new
TemporaryFolder();
@Before
public void preSubmit() throws Exception {
// Beam Write will add shard suffix to fileName, see ShardNameTemplate.
// So tempFile need have a parent to compare.
- File resultParent = createAndRegisterTempFile("result");
+ // TODO: Consider move to AbstractTestBase.createAndRegisterTempFile when
all tests migrated to
+ // JUnit 5
+ File resultParent = new File(TEMP_RESULT_FOLDER.newFolder(), "result");
resultDir = resultParent.toURI().toString();
resultPath = new File(resultParent, "file.txt").getAbsolutePath();
}
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index 5b3a3385460..7650df3072b 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -40,7 +40,9 @@ import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Before;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
/** Test for GroupByNullKey. */
public class GroupByNullKeyTest extends AbstractTestBase implements
Serializable {
@@ -50,6 +52,7 @@ public class GroupByNullKeyTest extends AbstractTestBase
implements Serializable
static final String[] EXPECTED_RESULT =
new String[] {"k: null v: user1 user1 user1 user2 user2 user2 user2
user3"};
+ @ClassRule public static final TemporaryFolder TEMP_RESULT_FOLDER = new
TemporaryFolder();
public GroupByNullKeyTest() {}
@@ -57,7 +60,9 @@ public class GroupByNullKeyTest extends AbstractTestBase
implements Serializable
public void preSubmit() throws Exception {
// Beam Write will add shard suffix to fileName, see ShardNameTemplate.
// So tempFile need have a parent to compare.
- File resultParent = createAndRegisterTempFile("result");
+ // TODO: Consider move to AbstractTestBase.createAndRegisterTempFile when
all tests migrated to
+ // JUnit 5
+ File resultParent = new File(TEMP_RESULT_FOLDER.newFolder(), "result");
resultDir = resultParent.toURI().toString();
resultPath = new File(resultParent, "file.txt").getAbsolutePath();
}
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
index f6fd654bbce..0625576a1b2 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
@@ -39,7 +39,9 @@ import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Before;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
/** Session window test. */
public class TopWikipediaSessionsTest extends AbstractTestBase implements
Serializable {
@@ -58,12 +60,15 @@ public class TopWikipediaSessionsTest extends
AbstractTestBase implements Serial
"user: user3 value:7",
"user: user3 value:2"
};
+ @ClassRule public static final TemporaryFolder TEMP_RESULT_FOLDER = new
TemporaryFolder();
@Before
public void preSubmit() throws Exception {
// Beam Write will add shard suffix to fileName, see ShardNameTemplate.
// So tempFile need have a parent to compare.
- File resultParent = createAndRegisterTempFile("result");
+ // TODO: Consider move to AbstractTestBase.createAndRegisterTempFile when
all tests migrated to
+ // JUnit 5
+ File resultParent = new File(TEMP_RESULT_FOLDER.newFolder(), "result");
resultDir = resultParent.toURI().toString();
resultPath = new File(resultParent, "file.txt").getAbsolutePath();
}
diff --git a/sdks/go/examples/wasm/README.md b/sdks/go/examples/wasm/README.md
index 103bef88642..e4ab54d4a3e 100644
--- a/sdks/go/examples/wasm/README.md
+++ b/sdks/go/examples/wasm/README.md
@@ -68,7 +68,7 @@ cd $BEAM_HOME
Expected output should include the following, from which you acquire the
latest flink runner version.
```shell
-'flink_versions: 1.17,1.18,1.19'
+'flink_versions: 1.17,1.18,1.19,1.20'
```
#### 2. Set to the latest flink runner version i.e. 1.16
diff --git a/sdks/python/apache_beam/options/pipeline_options.py
b/sdks/python/apache_beam/options/pipeline_options.py
index be9f530ffdc..f2addf6f9d5 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -1943,7 +1943,7 @@ class JobServerOptions(PipelineOptions):
class FlinkRunnerOptions(PipelineOptions):
# These should stay in sync with gradle.properties.
- PUBLISHED_FLINK_VERSIONS = ['1.17', '1.18', '1.19']
+ PUBLISHED_FLINK_VERSIONS = ['1.17', '1.18', '1.19', '1.20']
@classmethod
def _add_argparse_args(cls, parser):
diff --git a/sdks/typescript/src/apache_beam/runners/flink.ts
b/sdks/typescript/src/apache_beam/runners/flink.ts
index ab2d641b330..5877d9186a4 100644
--- a/sdks/typescript/src/apache_beam/runners/flink.ts
+++ b/sdks/typescript/src/apache_beam/runners/flink.ts
@@ -28,7 +28,7 @@ import { JavaJarService } from "../utils/service";
const MAGIC_HOST_NAMES = ["[local]", "[auto]"];
// These should stay in sync with gradle.properties.
-const PUBLISHED_FLINK_VERSIONS = ["1.17", "1.18", "1.19"];
+const PUBLISHED_FLINK_VERSIONS = ["1.17", "1.18", "1.19", "1.20"];
const defaultOptions = {
flinkMaster: "[local]",
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 23ae66f45a1..f91951c8189 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -127,18 +127,12 @@ include(":runners:extensions-java:metrics")
* verify versions in
website/www/site/content/en/documentation/runners/flink.md
* verify version in
sdks/python/apache_beam/runners/interactive/interactive_beam.py
*/
-// Flink 1.17
-include(":runners:flink:1.17")
-include(":runners:flink:1.17:job-server")
-include(":runners:flink:1.17:job-server-container")
-// Flink 1.18
-include(":runners:flink:1.18")
-include(":runners:flink:1.18:job-server")
-include(":runners:flink:1.18:job-server-container")
-// Flink 1.19
-include(":runners:flink:1.19")
-include(":runners:flink:1.19:job-server")
-include(":runners:flink:1.19:job-server-container")
+val flink_versions: String by settings
+for (version in flink_versions.split(',')) {
+ include(":runners:flink:${version}")
+ include(":runners:flink:${version}:job-server")
+ include(":runners:flink:${version}:job-server-container")
+}
/* End Flink Runner related settings */
include(":runners:twister2")
include(":runners:google-cloud-dataflow-java")