This is an automated email from the ASF dual-hosted git repository. ibzib pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 24decb50cf3 [BEAM-14503] Add support for Flink 1.15 (#17739) 24decb50cf3 is described below commit 24decb50cf3e14e8cce4dd9d82b8963bf57a1805 Author: Julien Tournay <boudhe...@gmail.com> AuthorDate: Wed May 25 19:59:04 2022 +0200 [BEAM-14503] Add support for Flink 1.15 (#17739) * Flink 1.15 runner * Ignore Flink 1.11 --- gradle.properties | 2 +- .../streaming/ProcessingTimeCallbackCompat.java | 22 +++++++++++ .../beam/runners/flink/MiniClusterCompat.java | 29 ++++++++++++++ runners/flink/1.15/build.gradle | 34 +++++++++++++++++ .../flink/1.15/job-server-container/build.gradle | 26 +++++++++++++ runners/flink/1.15/job-server/build.gradle | 31 +++++++++++++++ .../streaming/ProcessingTimeCallbackCompat.java | 22 +++++++++++ .../beam/runners/flink/MiniClusterCompat.java | 30 +++++++++++++++ runners/flink/flink_runner.gradle | 44 ++++++++++++++++------ .../flink/FlinkStreamingTransformTranslators.java | 4 +- .../wrappers/streaming/DoFnOperator.java | 3 +- .../streaming/io/UnboundedSourceWrapper.java | 4 +- .../flink/FlinkRequiresStableInputTest.java | 2 +- .../beam/runners/flink/FlinkSavepointTest.java | 2 +- .../python/apache_beam/options/pipeline_options.py | 2 +- settings.gradle.kts | 4 ++ 16 files changed, 240 insertions(+), 21 deletions(-) diff --git a/gradle.properties b/gradle.properties index f880631cd68..d1528eb2cd0 100644 --- a/gradle.properties +++ b/gradle.properties @@ -37,5 +37,5 @@ javaVersion=1.8 docker_image_default_repo_root=apache docker_image_default_repo_prefix=beam_ -flink_versions=1.12,1.13,1.14 +flink_versions=1.12,1.13,1.14,1.15 diff --git a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java new file mode 100644 index 00000000000..a494fec01dd --- /dev/null +++ b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; + +public interface ProcessingTimeCallbackCompat extends ProcessingTimeCallback {} diff --git a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java new file mode 100644 index 00000000000..1bbcd0159b1 --- /dev/null +++ b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import java.util.concurrent.CompletableFuture; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.minicluster.MiniCluster; + +public class MiniClusterCompat { + public static CompletableFuture<String> triggerSavepoint( + MiniCluster cluster, JobID jobId, String targetDirectory, boolean cancelJob) { + return cluster.triggerSavepoint(jobId, targetDirectory, cancelJob); + } +} diff --git a/runners/flink/1.15/build.gradle b/runners/flink/1.15/build.gradle new file mode 100644 index 00000000000..a3b5fb24699 --- /dev/null +++ b/runners/flink/1.15/build.gradle @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '..' + +/* All properties required for loading the Flink build script */ +project.ext { + // Set the version of all Flink-related dependencies here. + flink_version = '1.15.0' + // Version specific code overrides. + main_source_overrides = ["${basePath}/1.12/src/main/java", "${basePath}/1.13/src/main/java", "${basePath}/1.14/src/main/java", './src/main/java'] + test_source_overrides = ["${basePath}/1.12/src/test/java", "${basePath}/1.13/src/test/java", "${basePath}/1.14/src/test/java", './src/test/java'] + main_resources_overrides = [] + test_resources_overrides = [] + archives_base_name = 'beam-runners-flink-1.15' +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_runner.gradle" diff --git a/runners/flink/1.15/job-server-container/build.gradle b/runners/flink/1.15/job-server-container/build.gradle new file mode 100644 index 00000000000..afdb68a0fc9 --- /dev/null +++ b/runners/flink/1.15/job-server-container/build.gradle @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server-container' + +project.ext { + resource_path = basePath +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/1.15/job-server/build.gradle b/runners/flink/1.15/job-server/build.gradle new file mode 100644 index 00000000000..05ad8feb5b7 --- /dev/null +++ b/runners/flink/1.15/job-server/build.gradle @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server' + +project.ext { + // Look for the source code in the parent module + main_source_dirs = ["$basePath/src/main/java"] + test_source_dirs = ["$basePath/src/test/java"] + main_resources_dirs = ["$basePath/src/main/resources"] + test_resources_dirs = ["$basePath/src/test/resources"] + archives_base_name = 'beam-runners-flink-1.15-job-server' +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server.gradle" diff --git a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java new file mode 100644 index 00000000000..1b9baaef3f9 --- /dev/null +++ b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; + +public interface ProcessingTimeCallbackCompat extends ProcessingTimeCallback {} diff --git a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java new file mode 100644 index 00000000000..f02ad36116c --- /dev/null +++ b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import java.util.concurrent.CompletableFuture; +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.minicluster.MiniCluster; + +public class MiniClusterCompat { + public static CompletableFuture<String> triggerSavepoint( + MiniCluster cluster, JobID jobId, String targetDirectory, boolean cancelJob) { + return cluster.triggerSavepoint(jobId, targetDirectory, cancelJob, SavepointFormatType.DEFAULT); + } +} diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index 837bc74c32f..18cf7860ff3 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -140,13 +140,42 @@ dependencies { implementation library.java.slf4j_api implementation library.java.joda_time implementation library.java.args4j - implementation "org.apache.flink:flink-clients_2.12:$flink_version" - // Runtime dependencies are not included in Beam's generated pom.xml, so we must declare flink-clients in implementation - // configuration (https://issues.apache.org/jira/browse/BEAM-11732). - permitUnusedDeclared "org.apache.flink:flink-clients_2.12:$flink_version" + + // Flink 1.15 shades all remaining scala dependencies and therefor does not depend on a specific version of Scala anymore + if (flink_version.compareTo("1.15") >= 0) { + implementation "org.apache.flink:flink-clients:$flink_version" + // Runtime dependencies are not included in Beam's generated pom.xml, so we must declare flink-clients in implementation + // configuration (https://issues.apache.org/jira/browse/BEAM-11732). + permitUnusedDeclared "org.apache.flink:flink-clients:$flink_version" + + implementation "org.apache.flink:flink-streaming-java:$flink_version" + // RocksDB state backend (included in the Flink distribution) + provided "org.apache.flink:flink-statebackend-rocksdb:$flink_version" + testImplementation "org.apache.flink:flink-statebackend-rocksdb:$flink_version" + testImplementation "org.apache.flink:flink-streaming-java:$flink_version:tests" + testImplementation "org.apache.flink:flink-test-utils:$flink_version" + + miniCluster "org.apache.flink:flink-runtime-web:$flink_version" + } else { + implementation "org.apache.flink:flink-clients_2.12:$flink_version" + // Runtime dependencies are not included in Beam's generated pom.xml, so we must declare flink-clients in implementation + // configuration (https://issues.apache.org/jira/browse/BEAM-11732). + permitUnusedDeclared "org.apache.flink:flink-clients_2.12:$flink_version" + + implementation "org.apache.flink:flink-streaming-java_2.12:$flink_version" + // RocksDB state backend (included in the Flink distribution) + provided "org.apache.flink:flink-statebackend-rocksdb_2.12:$flink_version" + testImplementation "org.apache.flink:flink-statebackend-rocksdb_2.12:$flink_version" + testImplementation "org.apache.flink:flink-streaming-java_2.12:$flink_version:tests" + testImplementation "org.apache.flink:flink-test-utils_2.12:$flink_version" + + miniCluster "org.apache.flink:flink-runtime-web_2.12:$flink_version" + } + implementation "org.apache.flink:flink-core:$flink_version" implementation "org.apache.flink:flink-metrics-core:$flink_version" implementation "org.apache.flink:flink-java:$flink_version" + if (flink_version.compareTo("1.14") >= 0) { implementation "org.apache.flink:flink-runtime:$flink_version" implementation "org.apache.flink:flink-optimizer:$flink_version" @@ -157,10 +186,6 @@ dependencies { implementation "org.apache.flink:flink-optimizer_2.12:$flink_version" testImplementation "org.apache.flink:flink-runtime_2.12:$flink_version:tests" } - implementation "org.apache.flink:flink-streaming-java_2.12:$flink_version" - // RocksDB state backend (included in the Flink distribution) - provided "org.apache.flink:flink-statebackend-rocksdb_2.12:$flink_version" - testImplementation "org.apache.flink:flink-statebackend-rocksdb_2.12:$flink_version" testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") // FlinkStateInternalsTest extends abstract StateInternalsTest testImplementation project(path: ":runners:core-java", configuration: "testRuntimeMigration") @@ -172,14 +197,11 @@ dependencies { testImplementation project(":sdks:java:io:google-cloud-platform") testImplementation library.java.jackson_dataformat_yaml testImplementation "org.apache.flink:flink-core:$flink_version:tests" - testImplementation "org.apache.flink:flink-streaming-java_2.12:$flink_version:tests" - testImplementation "org.apache.flink:flink-test-utils_2.12:$flink_version" testImplementation project(":sdks:java:harness") testRuntimeOnly library.java.slf4j_simple validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest") validatesRunner project(path: ":runners:core-java", configuration: "testRuntimeMigration") validatesRunner project(project.path) - miniCluster "org.apache.flink:flink-runtime-web_2.12:$flink_version" implementation project(path: ":model:fn-execution", configuration: "shadow") implementation project(path: ":model:pipeline", configuration: "shadow") implementation project(path: ":model:job-management", configuration: "shadow") diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 18ece765af4..a9db234c45e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -44,6 +44,7 @@ import org.apache.beam.runners.flink.translation.functions.ImpulseSourceFunction import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.streaming.ProcessingTimeCallbackCompat; import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem; import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder; import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator; @@ -116,7 +117,6 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import org.checkerframework.checker.nullness.qual.Nullable; @@ -1499,7 +1499,7 @@ class FlinkStreamingTransformTranslators { static class UnboundedSourceWrapperNoValueWithRecordId< OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> extends RichParallelSourceFunction<WindowedValue<OutputT>> - implements ProcessingTimeCallback, + implements ProcessingTimeCallbackCompat, BeamStoppableFunction, CheckpointListener, CheckpointedFunction { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index b1016f073d6..611866e7741 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -118,7 +118,6 @@ import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.util.OutputTag; @@ -896,7 +895,7 @@ public class DoFnOperator<InputT, OutputT> } @SuppressWarnings("FutureReturnValueIgnored") - protected void scheduleForCurrentProcessingTime(ProcessingTimeCallback callback) { + protected void scheduleForCurrentProcessingTime(ProcessingTimeCallbackCompat callback) { // We are scheduling a timer for advancing the watermark, to not delay finishing the bundle // and temporarily release the checkpoint lock. Otherwise, we could potentially loop when a // timer keeps scheduling a timer for the same timestamp. diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 1527335b10a..5b6704ed35b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -28,6 +28,7 @@ import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.utils.Workarounds; +import org.apache.beam.runners.flink.translation.wrappers.streaming.ProcessingTimeCallbackCompat; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.SerializableCoder; @@ -55,7 +56,6 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,7 +67,7 @@ import org.slf4j.LoggerFactory; }) public class UnboundedSourceWrapper<OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> extends RichParallelSourceFunction<WindowedValue<ValueWithRecordId<OutputT>>> - implements ProcessingTimeCallback, + implements ProcessingTimeCallbackCompat, BeamStoppableFunction, CheckpointListener, CheckpointedFunction { diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRequiresStableInputTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRequiresStableInputTest.java index 406c6fd91be..b8f384502e1 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRequiresStableInputTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRequiresStableInputTest.java @@ -188,7 +188,7 @@ public class FlinkRequiresStableInputTest { // try multiple times because the job might not be ready yet for (int i = 0; i < 10; i++) { try { - return flinkCluster.triggerSavepoint(jobID, null, false).get(); + return MiniClusterCompat.triggerSavepoint(flinkCluster, jobID, null, false).get(); } catch (Exception e) { exception = e; Thread.sleep(100); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java index a5949a6965f..54a5cb2b11c 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java @@ -274,7 +274,7 @@ public class FlinkSavepointTest implements Serializable { // try multiple times because the job might not be ready yet for (int i = 0; i < 10; i++) { try { - return flinkCluster.triggerSavepoint(jobID, null, false).get(); + return MiniClusterCompat.triggerSavepoint(flinkCluster, jobID, null, false).get(); } catch (Exception e) { exception = e; LOG.debug("Exception while triggerSavepoint, trying again", e); diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 5aa29c0fd96..4c00285c808 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1358,7 +1358,7 @@ class JobServerOptions(PipelineOptions): class FlinkRunnerOptions(PipelineOptions): # These should stay in sync with gradle.properties. - PUBLISHED_FLINK_VERSIONS = ['1.12', '1.13', '1.14'] + PUBLISHED_FLINK_VERSIONS = ['1.12', '1.13', '1.14', '1.15'] @classmethod def _add_argparse_args(cls, parser): diff --git a/settings.gradle.kts b/settings.gradle.kts index 7889afe0526..7cc83b9698b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -84,6 +84,10 @@ include(":runners:flink:1.13:job-server-container") include(":runners:flink:1.14") include(":runners:flink:1.14:job-server") include(":runners:flink:1.14:job-server-container") +// Flink 1.15 +include(":runners:flink:1.15") +include(":runners:flink:1.15:job-server") +include(":runners:flink:1.15:job-server-container") /* End Flink Runner related settings */ include(":runners:twister2") include(":runners:google-cloud-dataflow-java")