This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f75792d044 Flink 1.20 support (#36893)
2f75792d044 is described below

commit 2f75792d044de5355bf7298feb7bbc9c689c7ffd
Author: Yi Hu <[email protected]>
AuthorDate: Tue Nov 25 18:59:50 2025 -0500

    Flink 1.20 support (#36893)
---
 .../test-properties.json                           |  6 ++---
 .../workflows/beam_LoadTests_Java_GBK_Smoke.yml    |  2 +-
 .../beam_PostCommit_Java_Examples_Flink.yml        |  2 +-
 .../beam_PostCommit_Java_Nexmark_Flink.yml         |  2 +-
 .../beam_PostCommit_Java_PVR_Flink_Streaming.yml   |  2 +-
 .../workflows/beam_PostCommit_Java_Tpcds_Flink.yml |  2 +-
 .../beam_PostCommit_Java_ValidatesRunner_Flink.yml |  2 +-
 ...PostCommit_Java_ValidatesRunner_Flink_Java8.yml |  2 +-
 .github/workflows/beam_PostCommit_XVR_Flink.yml    |  2 +-
 .../beam_PreCommit_Java_PVR_Flink_Batch.yml        |  2 +-
 .../beam_PreCommit_Java_PVR_Flink_Docker.yml       |  2 +-
 .../run_rc_validation_java_quickstart.yml          |  2 +-
 CHANGES.md                                         |  1 +
 gradle.properties                                  |  2 +-
 .../wrappers/streaming/DoFnOperator.java           |  0
 runners/flink/1.20/build.gradle                    | 25 +++++++++++++++++
 .../flink/1.20/job-server-container/build.gradle   | 26 ++++++++++++++++++
 runners/flink/1.20/job-server/build.gradle         | 31 ++++++++++++++++++++++
 .../wrappers/streaming/DoFnOperator.java           |  8 +++---
 runners/flink/flink_runner.gradle                  |  5 ++++
 .../translation/types/CoderTypeSerializer.java     |  0
 .../flink/FlinkExecutionEnvironmentsTest.java      | 13 +++++++--
 .../runners/flink/ReadSourceStreamingTest.java     |  7 ++++-
 .../flink/streaming/GroupByNullKeyTest.java        |  7 ++++-
 .../flink/streaming/TopWikipediaSessionsTest.java  |  7 ++++-
 sdks/go/examples/wasm/README.md                    |  2 +-
 .../python/apache_beam/options/pipeline_options.py |  2 +-
 sdks/typescript/src/apache_beam/runners/flink.ts   |  2 +-
 settings.gradle.kts                                | 18 +++++--------
 29 files changed, 144 insertions(+), 40 deletions(-)

diff --git a/.github/actions/setup-default-test-properties/test-properties.json 
b/.github/actions/setup-default-test-properties/test-properties.json
index 77d8d0d311f..91e264f483a 100644
--- a/.github/actions/setup-default-test-properties/test-properties.json
+++ b/.github/actions/setup-default-test-properties/test-properties.json
@@ -13,9 +13,9 @@
     "TOX_ENV": ["Cloud", "Cython"]
   },
   "JavaTestProperties": {
-    "SUPPORTED_VERSIONS": ["8", "11", "17", "21"],
-    "FLINK_VERSIONS": ["1.17", "1.18", "1.19"],
-    "SPARK_VERSIONS": ["2", "3"]
+    "SUPPORTED_VERSIONS": ["8", "11", "17", "21", "25"],
+    "FLINK_VERSIONS": ["1.17", "1.18", "1.19", "1.20"],
+    "SPARK_VERSIONS": ["3"]
   },
   "GoTestProperties": {
     "SUPPORTED_VERSIONS": ["1.25"]
diff --git a/.github/workflows/beam_LoadTests_Java_GBK_Smoke.yml 
b/.github/workflows/beam_LoadTests_Java_GBK_Smoke.yml
index 11ddb3f42f4..8c291efc1cd 100644
--- a/.github/workflows/beam_LoadTests_Java_GBK_Smoke.yml
+++ b/.github/workflows/beam_LoadTests_Java_GBK_Smoke.yml
@@ -106,7 +106,7 @@ jobs:
           arguments: |
             --info \
             
-PloadTest.mainClass=org.apache.beam.sdk.loadtests.GroupByKeyLoadTest \
-            -Prunner=:runners:flink:1.19 \
+            -Prunner=:runners:flink:1.20 \
             '-PloadTest.args=${{ 
env.beam_LoadTests_Java_GBK_Smoke_test_arguments_3 }}' \
       - name: run GroupByKey load test Spark
         uses: ./.github/actions/gradle-command-self-hosted-action
diff --git a/.github/workflows/beam_PostCommit_Java_Examples_Flink.yml 
b/.github/workflows/beam_PostCommit_Java_Examples_Flink.yml
index ec2b4db31dd..f1f51b32742 100644
--- a/.github/workflows/beam_PostCommit_Java_Examples_Flink.yml
+++ b/.github/workflows/beam_PostCommit_Java_Examples_Flink.yml
@@ -80,7 +80,7 @@ jobs:
       - name: run examplesIntegrationTest script
         uses: ./.github/actions/gradle-command-self-hosted-action
         with:
-          gradle-command: :runners:flink:1.19:examplesIntegrationTest
+          gradle-command: :runners:flink:1.20:examplesIntegrationTest
       - name: Archive JUnit Test Results
         uses: actions/upload-artifact@v4
         if: ${{ !success() }}
diff --git a/.github/workflows/beam_PostCommit_Java_Nexmark_Flink.yml 
b/.github/workflows/beam_PostCommit_Java_Nexmark_Flink.yml
index 2d026e3536a..389db7eb2fa 100644
--- a/.github/workflows/beam_PostCommit_Java_Nexmark_Flink.yml
+++ b/.github/workflows/beam_PostCommit_Java_Nexmark_Flink.yml
@@ -102,7 +102,7 @@ jobs:
         with:
           gradle-command: :sdks:java:testing:nexmark:run
           arguments: |
-            -Pnexmark.runner=:runners:flink:1.19 \
+            -Pnexmark.runner=:runners:flink:1.20 \
              "${{ env.GRADLE_COMMAND_ARGUMENTS }} --streaming=${{ 
matrix.streaming }} --queryLanguage=${{ matrix.queryLanguage }}" \
       - name: run PostCommit Java Nexmark Flink (${{ matrix.streaming }}) 
script
         if: matrix.queryLanguage == 'none'
diff --git a/.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml 
b/.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml
index a773d2c58ac..3d40c300db0 100644
--- a/.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml
+++ b/.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml
@@ -77,7 +77,7 @@ jobs:
       - name: run PostCommit Java Flink PortableValidatesRunner Streaming 
script
         uses: ./.github/actions/gradle-command-self-hosted-action
         with:
-            gradle-command: 
runners:flink:1.19:job-server:validatesPortableRunnerStreaming
+            gradle-command: 
runners:flink:1.20:job-server:validatesPortableRunnerStreaming
       - name: Archive JUnit Test Results
         uses: actions/upload-artifact@v4
         if: ${{ !success() }}
diff --git a/.github/workflows/beam_PostCommit_Java_Tpcds_Flink.yml 
b/.github/workflows/beam_PostCommit_Java_Tpcds_Flink.yml
index 78a9351a415..df29e476474 100644
--- a/.github/workflows/beam_PostCommit_Java_Tpcds_Flink.yml
+++ b/.github/workflows/beam_PostCommit_Java_Tpcds_Flink.yml
@@ -101,5 +101,5 @@ jobs:
         with:
           gradle-command: :sdks:java:testing:tpcds:run
           arguments: |
-            -Ptpcds.runner=:runners:flink:1.19 \
+            -Ptpcds.runner=:runners:flink:1.20 \
             "-Ptpcds.args=${{env.tpcdsBigQueryArgs}} 
${{env.tpcdsInfluxDBArgs}} ${{ env.GRADLE_COMMAND_ARGUMENTS }} 
--queries=${{env.tpcdsQueriesArg}}" \
diff --git a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml 
b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml
index 82e23e203b0..5d6a26301a8 100644
--- a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml
+++ b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml
@@ -78,7 +78,7 @@ jobs:
       - name: run validatesRunner script
         uses: ./.github/actions/gradle-command-self-hosted-action
         with:
-          gradle-command: :runners:flink:1.19:validatesRunner
+          gradle-command: :runners:flink:1.20:validatesRunner
       - name: Archive JUnit Test Results
         uses: actions/upload-artifact@v4
         if: ${{ !success() }}
diff --git 
a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink_Java8.yml 
b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink_Java8.yml
index 9b061028cbc..5103926e391 100644
--- a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink_Java8.yml
+++ b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink_Java8.yml
@@ -81,7 +81,7 @@ jobs:
       - name: run validatesRunner Java8 script
         uses: ./.github/actions/gradle-command-self-hosted-action
         with:
-          gradle-command: :runners:flink:1.19:validatesRunner
+          gradle-command: :runners:flink:1.20:validatesRunner
           arguments: |
             -PtestJavaVersion=8 \
             -Pjava8Home=$JAVA_HOME_8_X64 \
diff --git a/.github/workflows/beam_PostCommit_XVR_Flink.yml 
b/.github/workflows/beam_PostCommit_XVR_Flink.yml
index 53d1fd81546..8d0893eb2d7 100644
--- a/.github/workflows/beam_PostCommit_XVR_Flink.yml
+++ b/.github/workflows/beam_PostCommit_XVR_Flink.yml
@@ -47,7 +47,7 @@ env:
   DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
   GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
   GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
-  FlinkVersion: 1.19
+  FlinkVersion: 1.20
 
 jobs:
   beam_PostCommit_XVR_Flink:
diff --git a/.github/workflows/beam_PreCommit_Java_PVR_Flink_Batch.yml 
b/.github/workflows/beam_PreCommit_Java_PVR_Flink_Batch.yml
index a4ab0587b8f..9c93c3dc1ac 100644
--- a/.github/workflows/beam_PreCommit_Java_PVR_Flink_Batch.yml
+++ b/.github/workflows/beam_PreCommit_Java_PVR_Flink_Batch.yml
@@ -94,7 +94,7 @@ jobs:
       - name: run validatesPortableRunnerBatch script
         uses: ./.github/actions/gradle-command-self-hosted-action
         with:
-          gradle-command: 
:runners:flink:1.19:job-server:validatesPortableRunnerBatch
+          gradle-command: 
:runners:flink:1.20:job-server:validatesPortableRunnerBatch
         env:
           CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH }}
       - name: Archive JUnit Test Results
diff --git a/.github/workflows/beam_PreCommit_Java_PVR_Flink_Docker.yml 
b/.github/workflows/beam_PreCommit_Java_PVR_Flink_Docker.yml
index fce2e590d3e..fa4638c751a 100644
--- a/.github/workflows/beam_PreCommit_Java_PVR_Flink_Docker.yml
+++ b/.github/workflows/beam_PreCommit_Java_PVR_Flink_Docker.yml
@@ -99,7 +99,7 @@ jobs:
       - name: run PreCommit Java PVR Flink Docker script
         uses: ./.github/actions/gradle-command-self-hosted-action
         with:
-          gradle-command: 
:runners:flink:1.19:job-server:validatesPortableRunnerDocker
+          gradle-command: 
:runners:flink:1.20:job-server:validatesPortableRunnerDocker
         env:
           CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}}
       - name: Archive JUnit Test Results
diff --git a/.github/workflows/run_rc_validation_java_quickstart.yml 
b/.github/workflows/run_rc_validation_java_quickstart.yml
index 023839d5a3d..f39e8ac9392 100644
--- a/.github/workflows/run_rc_validation_java_quickstart.yml
+++ b/.github/workflows/run_rc_validation_java_quickstart.yml
@@ -88,7 +88,7 @@ jobs:
       - name: Run QuickStart Java Flink Runner
         uses: ./.github/actions/gradle-command-self-hosted-action
         with:
-          gradle-command: :runners:flink:1.19:runQuickstartJavaFlinkLocal
+          gradle-command: :runners:flink:1.20:runQuickstartJavaFlinkLocal
           arguments: |
             -Prepourl=${{ env.APACHE_REPO_URL }} \
             -Pver=${{ env.RELEASE_VERSION }}
diff --git a/CHANGES.md b/CHANGES.md
index e6f9cf13ff9..68af5a342d7 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -65,6 +65,7 @@
 
 * New highly anticipated feature X added to Python SDK 
([#X](https://github.com/apache/beam/issues/X)).
 * New highly anticipated feature Y added to Java SDK 
([#Y](https://github.com/apache/beam/issues/Y)).
+* Flink 1.20 support added 
([#32647](https://github.com/apache/beam/issues/32647)).
 
 ## I/Os
 
diff --git a/gradle.properties b/gradle.properties
index 510122c4e7b..311b7800706 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -39,6 +39,6 @@ docker_image_default_repo_root=apache
 docker_image_default_repo_prefix=beam_
 
 # supported flink versions
-flink_versions=1.17,1.18,1.19
+flink_versions=1.17,1.18,1.19,1.20
 # supported python versions
 python_versions=3.10,3.11,3.12,3.13
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/1.17/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
similarity index 100%
copy from 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
copy to 
runners/flink/1.17/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
diff --git a/runners/flink/1.20/build.gradle b/runners/flink/1.20/build.gradle
new file mode 100644
index 00000000000..4c148321ed4
--- /dev/null
+++ b/runners/flink/1.20/build.gradle
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+project.ext {
+    flink_major = '1.20'
+    flink_version = '1.20.3'
+}
+
+// Load the main build script which contains all build logic.
+apply from: "../flink_runner.gradle"
diff --git a/runners/flink/1.20/job-server-container/build.gradle 
b/runners/flink/1.20/job-server-container/build.gradle
new file mode 100644
index 00000000000..afdb68a0fc9
--- /dev/null
+++ b/runners/flink/1.20/job-server-container/build.gradle
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '../../job-server-container'
+
+project.ext {
+  resource_path = basePath
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_job_server_container.gradle"
diff --git a/runners/flink/1.20/job-server/build.gradle 
b/runners/flink/1.20/job-server/build.gradle
new file mode 100644
index 00000000000..e5fdd1febf9
--- /dev/null
+++ b/runners/flink/1.20/job-server/build.gradle
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '../../job-server'
+
+project.ext {
+  // Look for the source code in the parent module
+  main_source_dirs = ["$basePath/src/main/java"]
+  test_source_dirs = ["$basePath/src/test/java"]
+  main_resources_dirs = ["$basePath/src/main/resources"]
+  test_resources_dirs = ["$basePath/src/test/resources"]
+  archives_base_name = 'beam-runners-flink-1.20-job-server'
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_job_server.gradle"
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/1.20/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
similarity index 99%
rename from 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
rename to 
runners/flink/1.20/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 13414682f8e..43668e0298e 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/1.20/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -112,7 +112,6 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
 import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
@@ -233,7 +232,8 @@ public class DoFnOperator<PreInputT, InputT, OutputT>
   private static final int MAX_NUMBER_PENDING_BUNDLE_FINALIZATIONS = 32;
 
   protected transient InternalTimerService<TimerData> timerService;
-  private transient InternalTimeServiceManager<?> timeServiceManager;
+  // Flink 1.20 moved timeServiceManager to protected scope. No longer need 
delegate
+  // private transient InternalTimeServiceManager<?> timeServiceManager;
 
   private transient PushedBackElementsHandler<WindowedValue<InputT>> 
pushedBackElementsHandler;
 
@@ -492,9 +492,7 @@ public class DoFnOperator<PreInputT, InputT, OutputT>
       }
 
       timerInternals = new FlinkTimerInternals(timerService);
-      timeServiceManager =
-          getTimeServiceManager()
-              .orElseThrow(() -> new IllegalStateException("Time service 
manager is not set."));
+      Preconditions.checkNotNull(getTimeServiceManager(), "Time service 
manager is not set.");
     }
 
     outputManager =
diff --git a/runners/flink/flink_runner.gradle 
b/runners/flink/flink_runner.gradle
index 2bd3ad7b8db..52f9631f455 100644
--- a/runners/flink/flink_runner.gradle
+++ b/runners/flink/flink_runner.gradle
@@ -175,6 +175,11 @@ dependencies {
   implementation library.java.joda_time
   implementation library.java.args4j
 
+  // flink-core-api is introduced in Flink 1.20+
+  if (flink_major == '1.20' || flink_major.startsWith('2')) {
+    implementation "org.apache.flink:flink-core-api:$flink_version"
+  }
+
   implementation "org.apache.flink:flink-clients:$flink_version"
   // Runtime dependencies are not included in Beam's generated pom.xml, so we 
must declare flink-clients in implementation
   // configuration (https://issues.apache.org/jira/browse/BEAM-11732).
diff --git 
a/runners/flink/1.17/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
similarity index 100%
rename from 
runners/flink/1.17/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
rename to 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
index ec44d279586..7262760a632 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
@@ -563,7 +563,16 @@ public class FlinkExecutionEnvironmentsTest {
   }
 
   private String getSavepointPath(Object env) {
-    return ((Configuration) Whitebox.getInternalState(env, "configuration"))
-        .getString("execution.savepoint.path", null);
+    // pre Flink 1.20 config
+    String path =
+        ((Configuration) Whitebox.getInternalState(env, "configuration"))
+            .getString("execution.savepoint.path", null);
+    if (path == null) {
+      // Flink 1.20+
+      path =
+          ((Configuration) Whitebox.getInternalState(env, "configuration"))
+              .getString("execution.state-recovery.path", null);
+    }
+    return path;
   }
 }
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
index b8dc52f6cd4..d76a1bb2a27 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
@@ -28,7 +28,9 @@ import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 /** Reads from a bounded source in streaming. */
 public class ReadSourceStreamingTest extends AbstractTestBase {
@@ -40,12 +42,15 @@ public class ReadSourceStreamingTest extends 
AbstractTestBase {
 
   private static final String[] EXPECTED_RESULT =
       new String[] {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
+  @ClassRule public static final TemporaryFolder TEMP_RESULT_FOLDER = new 
TemporaryFolder();
 
   @Before
   public void preSubmit() throws Exception {
     // Beam Write will add shard suffix to fileName, see ShardNameTemplate.
     // So tempFile need have a parent to compare.
-    File resultParent = createAndRegisterTempFile("result");
+    // TODO: Consider move to AbstractTestBase.createAndRegisterTempFile when 
all tests migrated to
+    // JUnit 5
+    File resultParent = new File(TEMP_RESULT_FOLDER.newFolder(), "result");
     resultDir = resultParent.toURI().toString();
     resultPath = new File(resultParent, "file.txt").getAbsolutePath();
   }
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index 5b3a3385460..7650df3072b 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -40,7 +40,9 @@ import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 /** Test for GroupByNullKey. */
 public class GroupByNullKeyTest extends AbstractTestBase implements 
Serializable {
@@ -50,6 +52,7 @@ public class GroupByNullKeyTest extends AbstractTestBase 
implements Serializable
 
   static final String[] EXPECTED_RESULT =
       new String[] {"k: null v: user1 user1 user1 user2 user2 user2 user2 
user3"};
+  @ClassRule public static final TemporaryFolder TEMP_RESULT_FOLDER = new 
TemporaryFolder();
 
   public GroupByNullKeyTest() {}
 
@@ -57,7 +60,9 @@ public class GroupByNullKeyTest extends AbstractTestBase 
implements Serializable
   public void preSubmit() throws Exception {
     // Beam Write will add shard suffix to fileName, see ShardNameTemplate.
     // So tempFile need have a parent to compare.
-    File resultParent = createAndRegisterTempFile("result");
+    // TODO: Consider move to AbstractTestBase.createAndRegisterTempFile when 
all tests migrated to
+    // JUnit 5
+    File resultParent = new File(TEMP_RESULT_FOLDER.newFolder(), "result");
     resultDir = resultParent.toURI().toString();
     resultPath = new File(resultParent, "file.txt").getAbsolutePath();
   }
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
index f6fd654bbce..0625576a1b2 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
@@ -39,7 +39,9 @@ import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 /** Session window test. */
 public class TopWikipediaSessionsTest extends AbstractTestBase implements 
Serializable {
@@ -58,12 +60,15 @@ public class TopWikipediaSessionsTest extends 
AbstractTestBase implements Serial
         "user: user3 value:7",
         "user: user3 value:2"
       };
+  @ClassRule public static final TemporaryFolder TEMP_RESULT_FOLDER = new 
TemporaryFolder();
 
   @Before
   public void preSubmit() throws Exception {
     // Beam Write will add shard suffix to fileName, see ShardNameTemplate.
     // So tempFile need have a parent to compare.
-    File resultParent = createAndRegisterTempFile("result");
+    // TODO: Consider move to AbstractTestBase.createAndRegisterTempFile when 
all tests migrated to
+    // JUnit 5
+    File resultParent = new File(TEMP_RESULT_FOLDER.newFolder(), "result");
     resultDir = resultParent.toURI().toString();
     resultPath = new File(resultParent, "file.txt").getAbsolutePath();
   }
diff --git a/sdks/go/examples/wasm/README.md b/sdks/go/examples/wasm/README.md
index 103bef88642..e4ab54d4a3e 100644
--- a/sdks/go/examples/wasm/README.md
+++ b/sdks/go/examples/wasm/README.md
@@ -68,7 +68,7 @@ cd $BEAM_HOME
 Expected output should include the following, from which you acquire the 
latest flink runner version.
 
 ```shell
-'flink_versions: 1.17,1.18,1.19'
+'flink_versions: 1.17,1.18,1.19,1.20'
 ```
 
 #### 2. Set to the latest flink runner version i.e. 1.16
diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index be9f530ffdc..f2addf6f9d5 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -1943,7 +1943,7 @@ class JobServerOptions(PipelineOptions):
 class FlinkRunnerOptions(PipelineOptions):
 
   # These should stay in sync with gradle.properties.
-  PUBLISHED_FLINK_VERSIONS = ['1.17', '1.18', '1.19']
+  PUBLISHED_FLINK_VERSIONS = ['1.17', '1.18', '1.19', '1.20']
 
   @classmethod
   def _add_argparse_args(cls, parser):
diff --git a/sdks/typescript/src/apache_beam/runners/flink.ts 
b/sdks/typescript/src/apache_beam/runners/flink.ts
index ab2d641b330..5877d9186a4 100644
--- a/sdks/typescript/src/apache_beam/runners/flink.ts
+++ b/sdks/typescript/src/apache_beam/runners/flink.ts
@@ -28,7 +28,7 @@ import { JavaJarService } from "../utils/service";
 const MAGIC_HOST_NAMES = ["[local]", "[auto]"];
 
 // These should stay in sync with gradle.properties.
-const PUBLISHED_FLINK_VERSIONS = ["1.17", "1.18", "1.19"];
+const PUBLISHED_FLINK_VERSIONS = ["1.17", "1.18", "1.19", "1.20"];
 
 const defaultOptions = {
   flinkMaster: "[local]",
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 23ae66f45a1..f91951c8189 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -127,18 +127,12 @@ include(":runners:extensions-java:metrics")
   * verify versions in 
website/www/site/content/en/documentation/runners/flink.md
   * verify version in 
sdks/python/apache_beam/runners/interactive/interactive_beam.py
  */
-// Flink 1.17
-include(":runners:flink:1.17")
-include(":runners:flink:1.17:job-server")
-include(":runners:flink:1.17:job-server-container")
-// Flink 1.18
-include(":runners:flink:1.18")
-include(":runners:flink:1.18:job-server")
-include(":runners:flink:1.18:job-server-container")
-// Flink 1.19
-include(":runners:flink:1.19")
-include(":runners:flink:1.19:job-server")
-include(":runners:flink:1.19:job-server-container")
+val flink_versions: String by settings
+for (version in flink_versions.split(',')) {
+    include(":runners:flink:${version}")
+    include(":runners:flink:${version}:job-server")
+    include(":runners:flink:${version}:job-server-container")
+}
 /* End Flink Runner related settings */
 include(":runners:twister2")
 include(":runners:google-cloud-dataflow-java")

Reply via email to