This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch flink-1.20 in repository https://gitbox.apache.org/repos/asf/beam.git
commit d25c1227c1160c7f27f118e589ebf3b583386470 Author: Yi Hu <[email protected]> AuthorDate: Mon Nov 24 18:15:51 2025 -0500 Flink 1.20 support --- .../test-properties.json | 6 ++--- .../workflows/beam_LoadTests_Java_GBK_Smoke.yml | 2 +- .../beam_PostCommit_Java_Examples_Flink.yml | 2 +- .../beam_PostCommit_Java_Nexmark_Flink.yml | 2 +- .../beam_PostCommit_Java_PVR_Flink_Streaming.yml | 2 +- .../workflows/beam_PostCommit_Java_Tpcds_Flink.yml | 2 +- .../beam_PostCommit_Java_ValidatesRunner_Flink.yml | 2 +- ...PostCommit_Java_ValidatesRunner_Flink_Java8.yml | 2 +- .github/workflows/beam_PostCommit_XVR_Flink.yml | 2 +- .../beam_PreCommit_Java_PVR_Flink_Batch.yml | 2 +- .../beam_PreCommit_Java_PVR_Flink_Docker.yml | 2 +- .../run_rc_validation_java_quickstart.yml | 2 +- gradle.properties | 2 +- runners/flink/1.20/build.gradle | 25 +++++++++++++++++ .../flink/1.20/job-server-container/build.gradle | 26 ++++++++++++++++++ runners/flink/1.20/job-server/build.gradle | 31 ++++++++++++++++++++++ .../wrappers/streaming/DoFnOperator.java | 8 +++--- .../translation/types/CoderTypeSerializer.java | 0 .../flink/FlinkExecutionEnvironmentsTest.java | 13 +++++++-- .../runners/flink/ReadSourceStreamingTest.java | 7 ++++- .../flink/streaming/GroupByNullKeyTest.java | 7 ++++- .../flink/streaming/TopWikipediaSessionsTest.java | 7 ++++- sdks/go/examples/wasm/README.md | 2 +- .../python/apache_beam/options/pipeline_options.py | 2 +- sdks/typescript/src/apache_beam/runners/flink.ts | 2 +- settings.gradle.kts | 18 +++++-------- 26 files changed, 139 insertions(+), 39 deletions(-) diff --git a/.github/actions/setup-default-test-properties/test-properties.json b/.github/actions/setup-default-test-properties/test-properties.json index 77d8d0d311f..91e264f483a 100644 --- a/.github/actions/setup-default-test-properties/test-properties.json +++ b/.github/actions/setup-default-test-properties/test-properties.json @@ -13,9 +13,9 @@ "TOX_ENV": ["Cloud", "Cython"] }, "JavaTestProperties": { - "SUPPORTED_VERSIONS": ["8", "11", "17", "21"], - "FLINK_VERSIONS": ["1.17", "1.18", "1.19"], - "SPARK_VERSIONS": ["2", "3"] + "SUPPORTED_VERSIONS": ["8", "11", "17", "21", "25"], + "FLINK_VERSIONS": ["1.17", "1.18", "1.19", "1.20"], + "SPARK_VERSIONS": ["3"] }, "GoTestProperties": { "SUPPORTED_VERSIONS": ["1.25"] diff --git a/.github/workflows/beam_LoadTests_Java_GBK_Smoke.yml b/.github/workflows/beam_LoadTests_Java_GBK_Smoke.yml index 11ddb3f42f4..8c291efc1cd 100644 --- a/.github/workflows/beam_LoadTests_Java_GBK_Smoke.yml +++ b/.github/workflows/beam_LoadTests_Java_GBK_Smoke.yml @@ -106,7 +106,7 @@ jobs: arguments: | --info \ -PloadTest.mainClass=org.apache.beam.sdk.loadtests.GroupByKeyLoadTest \ - -Prunner=:runners:flink:1.19 \ + -Prunner=:runners:flink:1.20 \ '-PloadTest.args=${{ env.beam_LoadTests_Java_GBK_Smoke_test_arguments_3 }}' \ - name: run GroupByKey load test Spark uses: ./.github/actions/gradle-command-self-hosted-action diff --git a/.github/workflows/beam_PostCommit_Java_Examples_Flink.yml b/.github/workflows/beam_PostCommit_Java_Examples_Flink.yml index ec2b4db31dd..f1f51b32742 100644 --- a/.github/workflows/beam_PostCommit_Java_Examples_Flink.yml +++ b/.github/workflows/beam_PostCommit_Java_Examples_Flink.yml @@ -80,7 +80,7 @@ jobs: - name: run examplesIntegrationTest script uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :runners:flink:1.19:examplesIntegrationTest + gradle-command: :runners:flink:1.20:examplesIntegrationTest - name: Archive JUnit Test Results uses: actions/upload-artifact@v4 if: ${{ !success() }} diff --git a/.github/workflows/beam_PostCommit_Java_Nexmark_Flink.yml b/.github/workflows/beam_PostCommit_Java_Nexmark_Flink.yml index 2d026e3536a..389db7eb2fa 100644 --- a/.github/workflows/beam_PostCommit_Java_Nexmark_Flink.yml +++ b/.github/workflows/beam_PostCommit_Java_Nexmark_Flink.yml @@ -102,7 +102,7 @@ jobs: with: gradle-command: :sdks:java:testing:nexmark:run arguments: | - -Pnexmark.runner=:runners:flink:1.19 \ + -Pnexmark.runner=:runners:flink:1.20 \ "${{ env.GRADLE_COMMAND_ARGUMENTS }} --streaming=${{ matrix.streaming }} --queryLanguage=${{ matrix.queryLanguage }}" \ - name: run PostCommit Java Nexmark Flink (${{ matrix.streaming }}) script if: matrix.queryLanguage == 'none' diff --git a/.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml b/.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml index a773d2c58ac..3d40c300db0 100644 --- a/.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml +++ b/.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml @@ -77,7 +77,7 @@ jobs: - name: run PostCommit Java Flink PortableValidatesRunner Streaming script uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: runners:flink:1.19:job-server:validatesPortableRunnerStreaming + gradle-command: runners:flink:1.20:job-server:validatesPortableRunnerStreaming - name: Archive JUnit Test Results uses: actions/upload-artifact@v4 if: ${{ !success() }} diff --git a/.github/workflows/beam_PostCommit_Java_Tpcds_Flink.yml b/.github/workflows/beam_PostCommit_Java_Tpcds_Flink.yml index 78a9351a415..df29e476474 100644 --- a/.github/workflows/beam_PostCommit_Java_Tpcds_Flink.yml +++ b/.github/workflows/beam_PostCommit_Java_Tpcds_Flink.yml @@ -101,5 +101,5 @@ jobs: with: gradle-command: :sdks:java:testing:tpcds:run arguments: | - -Ptpcds.runner=:runners:flink:1.19 \ + -Ptpcds.runner=:runners:flink:1.20 \ "-Ptpcds.args=${{env.tpcdsBigQueryArgs}} ${{env.tpcdsInfluxDBArgs}} ${{ env.GRADLE_COMMAND_ARGUMENTS }} --queries=${{env.tpcdsQueriesArg}}" \ diff --git a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml index 82e23e203b0..5d6a26301a8 100644 --- a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml +++ b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml @@ -78,7 +78,7 @@ jobs: - name: run validatesRunner script uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :runners:flink:1.19:validatesRunner + gradle-command: :runners:flink:1.20:validatesRunner - name: Archive JUnit Test Results uses: actions/upload-artifact@v4 if: ${{ !success() }} diff --git a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink_Java8.yml b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink_Java8.yml index 9b061028cbc..5103926e391 100644 --- a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink_Java8.yml +++ b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink_Java8.yml @@ -81,7 +81,7 @@ jobs: - name: run validatesRunner Java8 script uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :runners:flink:1.19:validatesRunner + gradle-command: :runners:flink:1.20:validatesRunner arguments: | -PtestJavaVersion=8 \ -Pjava8Home=$JAVA_HOME_8_X64 \ diff --git a/.github/workflows/beam_PostCommit_XVR_Flink.yml b/.github/workflows/beam_PostCommit_XVR_Flink.yml index 53d1fd81546..8d0893eb2d7 100644 --- a/.github/workflows/beam_PostCommit_XVR_Flink.yml +++ b/.github/workflows/beam_PostCommit_XVR_Flink.yml @@ -47,7 +47,7 @@ env: DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }} GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} - FlinkVersion: 1.19 + FlinkVersion: 1.20 jobs: beam_PostCommit_XVR_Flink: diff --git a/.github/workflows/beam_PreCommit_Java_PVR_Flink_Batch.yml b/.github/workflows/beam_PreCommit_Java_PVR_Flink_Batch.yml index a4ab0587b8f..9c93c3dc1ac 100644 --- a/.github/workflows/beam_PreCommit_Java_PVR_Flink_Batch.yml +++ b/.github/workflows/beam_PreCommit_Java_PVR_Flink_Batch.yml @@ -94,7 +94,7 @@ jobs: - name: run validatesPortableRunnerBatch script uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :runners:flink:1.19:job-server:validatesPortableRunnerBatch + gradle-command: :runners:flink:1.20:job-server:validatesPortableRunnerBatch env: CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH }} - name: Archive JUnit Test Results diff --git a/.github/workflows/beam_PreCommit_Java_PVR_Flink_Docker.yml b/.github/workflows/beam_PreCommit_Java_PVR_Flink_Docker.yml index fce2e590d3e..fa4638c751a 100644 --- a/.github/workflows/beam_PreCommit_Java_PVR_Flink_Docker.yml +++ b/.github/workflows/beam_PreCommit_Java_PVR_Flink_Docker.yml @@ -99,7 +99,7 @@ jobs: - name: run PreCommit Java PVR Flink Docker script uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :runners:flink:1.19:job-server:validatesPortableRunnerDocker + gradle-command: :runners:flink:1.20:job-server:validatesPortableRunnerDocker env: CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}} - name: Archive JUnit Test Results diff --git a/.github/workflows/run_rc_validation_java_quickstart.yml b/.github/workflows/run_rc_validation_java_quickstart.yml index 023839d5a3d..f39e8ac9392 100644 --- a/.github/workflows/run_rc_validation_java_quickstart.yml +++ b/.github/workflows/run_rc_validation_java_quickstart.yml @@ -88,7 +88,7 @@ jobs: - name: Run QuickStart Java Flink Runner uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :runners:flink:1.19:runQuickstartJavaFlinkLocal + gradle-command: :runners:flink:1.20:runQuickstartJavaFlinkLocal arguments: | -Prepourl=${{ env.APACHE_REPO_URL }} \ -Pver=${{ env.RELEASE_VERSION }} diff --git a/gradle.properties b/gradle.properties index 510122c4e7b..311b7800706 100644 --- a/gradle.properties +++ b/gradle.properties @@ -39,6 +39,6 @@ docker_image_default_repo_root=apache docker_image_default_repo_prefix=beam_ # supported flink versions -flink_versions=1.17,1.18,1.19 +flink_versions=1.17,1.18,1.19,1.20 # supported python versions python_versions=3.10,3.11,3.12,3.13 diff --git a/runners/flink/1.20/build.gradle b/runners/flink/1.20/build.gradle new file mode 100644 index 00000000000..4c148321ed4 --- /dev/null +++ b/runners/flink/1.20/build.gradle @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +project.ext { + flink_major = '1.20' + flink_version = '1.20.3' +} + +// Load the main build script which contains all build logic. +apply from: "../flink_runner.gradle" diff --git a/runners/flink/1.20/job-server-container/build.gradle b/runners/flink/1.20/job-server-container/build.gradle new file mode 100644 index 00000000000..afdb68a0fc9 --- /dev/null +++ b/runners/flink/1.20/job-server-container/build.gradle @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server-container' + +project.ext { + resource_path = basePath +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/1.20/job-server/build.gradle b/runners/flink/1.20/job-server/build.gradle new file mode 100644 index 00000000000..e5fdd1febf9 --- /dev/null +++ b/runners/flink/1.20/job-server/build.gradle @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server' + +project.ext { + // Look for the source code in the parent module + main_source_dirs = ["$basePath/src/main/java"] + test_source_dirs = ["$basePath/src/test/java"] + main_resources_dirs = ["$basePath/src/main/resources"] + test_resources_dirs = ["$basePath/src/test/resources"] + archives_base_name = 'beam-runners-flink-1.20-job-server' +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server.gradle" diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/1.20/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java similarity index 99% rename from runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java rename to runners/flink/1.20/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 13414682f8e..43668e0298e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/1.20/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -112,7 +112,6 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; -import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; @@ -233,7 +232,8 @@ public class DoFnOperator<PreInputT, InputT, OutputT> private static final int MAX_NUMBER_PENDING_BUNDLE_FINALIZATIONS = 32; protected transient InternalTimerService<TimerData> timerService; - private transient InternalTimeServiceManager<?> timeServiceManager; + // Flink 1.20 moved timeServiceManager to protected scope. No longer need delegate + // private transient InternalTimeServiceManager<?> timeServiceManager; private transient PushedBackElementsHandler<WindowedValue<InputT>> pushedBackElementsHandler; @@ -492,9 +492,7 @@ public class DoFnOperator<PreInputT, InputT, OutputT> } timerInternals = new FlinkTimerInternals(timerService); - timeServiceManager = - getTimeServiceManager() - .orElseThrow(() -> new IllegalStateException("Time service manager is not set.")); + Preconditions.checkNotNull(getTimeServiceManager(), "Time service manager is not set."); } outputManager = diff --git a/runners/flink/1.17/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java similarity index 100% rename from runners/flink/1.17/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java index ec44d279586..7262760a632 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java @@ -563,7 +563,16 @@ public class FlinkExecutionEnvironmentsTest { } private String getSavepointPath(Object env) { - return ((Configuration) Whitebox.getInternalState(env, "configuration")) - .getString("execution.savepoint.path", null); + // pre Flink 1.20 config + String path = + ((Configuration) Whitebox.getInternalState(env, "configuration")) + .getString("execution.savepoint.path", null); + if (path == null) { + // Flink 1.20+ + path = + ((Configuration) Whitebox.getInternalState(env, "configuration")) + .getString("execution.state-recovery.path", null); + } + return path; } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java index b8dc52f6cd4..d76a1bb2a27 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java @@ -28,7 +28,9 @@ import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.test.util.TestBaseUtils; import org.junit.After; import org.junit.Before; +import org.junit.ClassRule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; /** Reads from a bounded source in streaming. */ public class ReadSourceStreamingTest extends AbstractTestBase { @@ -40,12 +42,15 @@ public class ReadSourceStreamingTest extends AbstractTestBase { private static final String[] EXPECTED_RESULT = new String[] {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}; + @ClassRule public static final TemporaryFolder TEMP_RESULT_FOLDER = new TemporaryFolder(); @Before public void preSubmit() throws Exception { // Beam Write will add shard suffix to fileName, see ShardNameTemplate. // So tempFile need have a parent to compare. - File resultParent = createAndRegisterTempFile("result"); + // TODO: Consider move to AbstractTestBase.createAndRegisterTempFile when all tests migrated to + // JUnit 5 + File resultParent = new File(TEMP_RESULT_FOLDER.newFolder(), "result"); resultDir = resultParent.toURI().toString(); resultPath = new File(resultParent, "file.txt").getAbsolutePath(); } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java index 5b3a3385460..7650df3072b 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java @@ -40,7 +40,9 @@ import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.After; import org.junit.Before; +import org.junit.ClassRule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; /** Test for GroupByNullKey. */ public class GroupByNullKeyTest extends AbstractTestBase implements Serializable { @@ -50,6 +52,7 @@ public class GroupByNullKeyTest extends AbstractTestBase implements Serializable static final String[] EXPECTED_RESULT = new String[] {"k: null v: user1 user1 user1 user2 user2 user2 user2 user3"}; + @ClassRule public static final TemporaryFolder TEMP_RESULT_FOLDER = new TemporaryFolder(); public GroupByNullKeyTest() {} @@ -57,7 +60,9 @@ public class GroupByNullKeyTest extends AbstractTestBase implements Serializable public void preSubmit() throws Exception { // Beam Write will add shard suffix to fileName, see ShardNameTemplate. // So tempFile need have a parent to compare. - File resultParent = createAndRegisterTempFile("result"); + // TODO: Consider move to AbstractTestBase.createAndRegisterTempFile when all tests migrated to + // JUnit 5 + File resultParent = new File(TEMP_RESULT_FOLDER.newFolder(), "result"); resultDir = resultParent.toURI().toString(); resultPath = new File(resultParent, "file.txt").getAbsolutePath(); } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java index f6fd654bbce..0625576a1b2 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java @@ -39,7 +39,9 @@ import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.After; import org.junit.Before; +import org.junit.ClassRule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; /** Session window test. */ public class TopWikipediaSessionsTest extends AbstractTestBase implements Serializable { @@ -58,12 +60,15 @@ public class TopWikipediaSessionsTest extends AbstractTestBase implements Serial "user: user3 value:7", "user: user3 value:2" }; + @ClassRule public static final TemporaryFolder TEMP_RESULT_FOLDER = new TemporaryFolder(); @Before public void preSubmit() throws Exception { // Beam Write will add shard suffix to fileName, see ShardNameTemplate. // So tempFile need have a parent to compare. - File resultParent = createAndRegisterTempFile("result"); + // TODO: Consider move to AbstractTestBase.createAndRegisterTempFile when all tests migrated to + // JUnit 5 + File resultParent = new File(TEMP_RESULT_FOLDER.newFolder(), "result"); resultDir = resultParent.toURI().toString(); resultPath = new File(resultParent, "file.txt").getAbsolutePath(); } diff --git a/sdks/go/examples/wasm/README.md b/sdks/go/examples/wasm/README.md index 103bef88642..e4ab54d4a3e 100644 --- a/sdks/go/examples/wasm/README.md +++ b/sdks/go/examples/wasm/README.md @@ -68,7 +68,7 @@ cd $BEAM_HOME Expected output should include the following, from which you acquire the latest flink runner version. ```shell -'flink_versions: 1.17,1.18,1.19' +'flink_versions: 1.17,1.18,1.19,1.20' ``` #### 2. Set to the latest flink runner version i.e. 1.16 diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index be9f530ffdc..f2addf6f9d5 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1943,7 +1943,7 @@ class JobServerOptions(PipelineOptions): class FlinkRunnerOptions(PipelineOptions): # These should stay in sync with gradle.properties. - PUBLISHED_FLINK_VERSIONS = ['1.17', '1.18', '1.19'] + PUBLISHED_FLINK_VERSIONS = ['1.17', '1.18', '1.19', '1.20'] @classmethod def _add_argparse_args(cls, parser): diff --git a/sdks/typescript/src/apache_beam/runners/flink.ts b/sdks/typescript/src/apache_beam/runners/flink.ts index ab2d641b330..5877d9186a4 100644 --- a/sdks/typescript/src/apache_beam/runners/flink.ts +++ b/sdks/typescript/src/apache_beam/runners/flink.ts @@ -28,7 +28,7 @@ import { JavaJarService } from "../utils/service"; const MAGIC_HOST_NAMES = ["[local]", "[auto]"]; // These should stay in sync with gradle.properties. -const PUBLISHED_FLINK_VERSIONS = ["1.17", "1.18", "1.19"]; +const PUBLISHED_FLINK_VERSIONS = ["1.17", "1.18", "1.19", "1.20"]; const defaultOptions = { flinkMaster: "[local]", diff --git a/settings.gradle.kts b/settings.gradle.kts index 23ae66f45a1..32e1bbd5ef1 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -16,6 +16,7 @@ * limitations under the License. */ import com.gradle.enterprise.gradleplugin.internal.extension.BuildScanExtensionWithHiddenFeatures +import org.gradle.kotlin.dsl.project pluginManagement { plugins { @@ -128,17 +129,12 @@ include(":runners:extensions-java:metrics") * verify version in sdks/python/apache_beam/runners/interactive/interactive_beam.py */ // Flink 1.17 -include(":runners:flink:1.17") -include(":runners:flink:1.17:job-server") -include(":runners:flink:1.17:job-server-container") -// Flink 1.18 -include(":runners:flink:1.18") -include(":runners:flink:1.18:job-server") -include(":runners:flink:1.18:job-server-container") -// Flink 1.19 -include(":runners:flink:1.19") -include(":runners:flink:1.19:job-server") -include(":runners:flink:1.19:job-server-container") +val flink_versions: String by settings +for (version in flink_versions.split(',')) { + include(":runners:flink:${version}") + include(":runners:flink:${version}:job-server") + include(":runners:flink:${version}:job-server-container") +} /* End Flink Runner related settings */ include(":runners:twister2") include(":runners:google-cloud-dataflow-java")
