This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 24decb50cf3 [BEAM-14503] Add support for Flink 1.15 (#17739)
24decb50cf3 is described below
commit 24decb50cf3e14e8cce4dd9d82b8963bf57a1805
Author: Julien Tournay <[email protected]>
AuthorDate: Wed May 25 19:59:04 2022 +0200
[BEAM-14503] Add support for Flink 1.15 (#17739)
* Flink 1.15 runner
* Ignore Flink 1.11
---
gradle.properties | 2 +-
.../streaming/ProcessingTimeCallbackCompat.java | 22 +++++++++++
.../beam/runners/flink/MiniClusterCompat.java | 29 ++++++++++++++
runners/flink/1.15/build.gradle | 34 +++++++++++++++++
.../flink/1.15/job-server-container/build.gradle | 26 +++++++++++++
runners/flink/1.15/job-server/build.gradle | 31 +++++++++++++++
.../streaming/ProcessingTimeCallbackCompat.java | 22 +++++++++++
.../beam/runners/flink/MiniClusterCompat.java | 30 +++++++++++++++
runners/flink/flink_runner.gradle | 44 ++++++++++++++++------
.../flink/FlinkStreamingTransformTranslators.java | 4 +-
.../wrappers/streaming/DoFnOperator.java | 3 +-
.../streaming/io/UnboundedSourceWrapper.java | 4 +-
.../flink/FlinkRequiresStableInputTest.java | 2 +-
.../beam/runners/flink/FlinkSavepointTest.java | 2 +-
.../python/apache_beam/options/pipeline_options.py | 2 +-
settings.gradle.kts | 4 ++
16 files changed, 240 insertions(+), 21 deletions(-)
diff --git a/gradle.properties b/gradle.properties
index f880631cd68..d1528eb2cd0 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -37,5 +37,5 @@ javaVersion=1.8
docker_image_default_repo_root=apache
docker_image_default_repo_prefix=beam_
-flink_versions=1.12,1.13,1.14
+flink_versions=1.12,1.13,1.14,1.15
diff --git
a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java
b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java
new file mode 100644
index 00000000000..a494fec01dd
--- /dev/null
+++
b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+
+public interface ProcessingTimeCallbackCompat extends ProcessingTimeCallback {}
diff --git
a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java
b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java
new file mode 100644
index 00000000000..1bbcd0159b1
--- /dev/null
+++
b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+
+public class MiniClusterCompat {
+ public static CompletableFuture<String> triggerSavepoint(
+ MiniCluster cluster, JobID jobId, String targetDirectory, boolean
cancelJob) {
+ return cluster.triggerSavepoint(jobId, targetDirectory, cancelJob);
+ }
+}
diff --git a/runners/flink/1.15/build.gradle b/runners/flink/1.15/build.gradle
new file mode 100644
index 00000000000..a3b5fb24699
--- /dev/null
+++ b/runners/flink/1.15/build.gradle
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '..'
+
+/* All properties required for loading the Flink build script */
+project.ext {
+ // Set the version of all Flink-related dependencies here.
+ flink_version = '1.15.0'
+ // Version specific code overrides.
+ main_source_overrides = ["${basePath}/1.12/src/main/java",
"${basePath}/1.13/src/main/java", "${basePath}/1.14/src/main/java",
'./src/main/java']
+ test_source_overrides = ["${basePath}/1.12/src/test/java",
"${basePath}/1.13/src/test/java", "${basePath}/1.14/src/test/java",
'./src/test/java']
+ main_resources_overrides = []
+ test_resources_overrides = []
+ archives_base_name = 'beam-runners-flink-1.15'
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_runner.gradle"
diff --git a/runners/flink/1.15/job-server-container/build.gradle
b/runners/flink/1.15/job-server-container/build.gradle
new file mode 100644
index 00000000000..afdb68a0fc9
--- /dev/null
+++ b/runners/flink/1.15/job-server-container/build.gradle
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '../../job-server-container'
+
+project.ext {
+ resource_path = basePath
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_job_server_container.gradle"
diff --git a/runners/flink/1.15/job-server/build.gradle
b/runners/flink/1.15/job-server/build.gradle
new file mode 100644
index 00000000000..05ad8feb5b7
--- /dev/null
+++ b/runners/flink/1.15/job-server/build.gradle
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '../../job-server'
+
+project.ext {
+ // Look for the source code in the parent module
+ main_source_dirs = ["$basePath/src/main/java"]
+ test_source_dirs = ["$basePath/src/test/java"]
+ main_resources_dirs = ["$basePath/src/main/resources"]
+ test_resources_dirs = ["$basePath/src/test/resources"]
+ archives_base_name = 'beam-runners-flink-1.15-job-server'
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_job_server.gradle"
diff --git
a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java
b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java
new file mode 100644
index 00000000000..1b9baaef3f9
--- /dev/null
+++
b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
+
+public interface ProcessingTimeCallbackCompat extends ProcessingTimeCallback {}
diff --git
a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java
b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java
new file mode 100644
index 00000000000..f02ad36116c
--- /dev/null
+++
b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+
+public class MiniClusterCompat {
+ public static CompletableFuture<String> triggerSavepoint(
+ MiniCluster cluster, JobID jobId, String targetDirectory, boolean
cancelJob) {
+ return cluster.triggerSavepoint(jobId, targetDirectory, cancelJob,
SavepointFormatType.DEFAULT);
+ }
+}
diff --git a/runners/flink/flink_runner.gradle
b/runners/flink/flink_runner.gradle
index 837bc74c32f..18cf7860ff3 100644
--- a/runners/flink/flink_runner.gradle
+++ b/runners/flink/flink_runner.gradle
@@ -140,13 +140,42 @@ dependencies {
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation library.java.args4j
- implementation "org.apache.flink:flink-clients_2.12:$flink_version"
- // Runtime dependencies are not included in Beam's generated pom.xml, so we
must declare flink-clients in implementation
- // configuration (https://issues.apache.org/jira/browse/BEAM-11732).
- permitUnusedDeclared "org.apache.flink:flink-clients_2.12:$flink_version"
+
+ // Flink 1.15 shades all remaining scala dependencies and therefor does not
depend on a specific version of Scala anymore
+ if (flink_version.compareTo("1.15") >= 0) {
+ implementation "org.apache.flink:flink-clients:$flink_version"
+ // Runtime dependencies are not included in Beam's generated pom.xml, so
we must declare flink-clients in implementation
+ // configuration (https://issues.apache.org/jira/browse/BEAM-11732).
+ permitUnusedDeclared "org.apache.flink:flink-clients:$flink_version"
+
+ implementation "org.apache.flink:flink-streaming-java:$flink_version"
+ // RocksDB state backend (included in the Flink distribution)
+ provided "org.apache.flink:flink-statebackend-rocksdb:$flink_version"
+ testImplementation
"org.apache.flink:flink-statebackend-rocksdb:$flink_version"
+ testImplementation
"org.apache.flink:flink-streaming-java:$flink_version:tests"
+ testImplementation "org.apache.flink:flink-test-utils:$flink_version"
+
+ miniCluster "org.apache.flink:flink-runtime-web:$flink_version"
+ } else {
+ implementation "org.apache.flink:flink-clients_2.12:$flink_version"
+ // Runtime dependencies are not included in Beam's generated pom.xml, so
we must declare flink-clients in implementation
+ // configuration (https://issues.apache.org/jira/browse/BEAM-11732).
+ permitUnusedDeclared "org.apache.flink:flink-clients_2.12:$flink_version"
+
+ implementation "org.apache.flink:flink-streaming-java_2.12:$flink_version"
+ // RocksDB state backend (included in the Flink distribution)
+ provided "org.apache.flink:flink-statebackend-rocksdb_2.12:$flink_version"
+ testImplementation
"org.apache.flink:flink-statebackend-rocksdb_2.12:$flink_version"
+ testImplementation
"org.apache.flink:flink-streaming-java_2.12:$flink_version:tests"
+ testImplementation "org.apache.flink:flink-test-utils_2.12:$flink_version"
+
+ miniCluster "org.apache.flink:flink-runtime-web_2.12:$flink_version"
+ }
+
implementation "org.apache.flink:flink-core:$flink_version"
implementation "org.apache.flink:flink-metrics-core:$flink_version"
implementation "org.apache.flink:flink-java:$flink_version"
+
if (flink_version.compareTo("1.14") >= 0) {
implementation "org.apache.flink:flink-runtime:$flink_version"
implementation "org.apache.flink:flink-optimizer:$flink_version"
@@ -157,10 +186,6 @@ dependencies {
implementation "org.apache.flink:flink-optimizer_2.12:$flink_version"
testImplementation
"org.apache.flink:flink-runtime_2.12:$flink_version:tests"
}
- implementation "org.apache.flink:flink-streaming-java_2.12:$flink_version"
- // RocksDB state backend (included in the Flink distribution)
- provided "org.apache.flink:flink-statebackend-rocksdb_2.12:$flink_version"
- testImplementation
"org.apache.flink:flink-statebackend-rocksdb_2.12:$flink_version"
testImplementation project(path: ":sdks:java:core", configuration:
"shadowTest")
// FlinkStateInternalsTest extends abstract StateInternalsTest
testImplementation project(path: ":runners:core-java", configuration:
"testRuntimeMigration")
@@ -172,14 +197,11 @@ dependencies {
testImplementation project(":sdks:java:io:google-cloud-platform")
testImplementation library.java.jackson_dataformat_yaml
testImplementation "org.apache.flink:flink-core:$flink_version:tests"
- testImplementation
"org.apache.flink:flink-streaming-java_2.12:$flink_version:tests"
- testImplementation "org.apache.flink:flink-test-utils_2.12:$flink_version"
testImplementation project(":sdks:java:harness")
testRuntimeOnly library.java.slf4j_simple
validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
validatesRunner project(path: ":runners:core-java", configuration:
"testRuntimeMigration")
validatesRunner project(project.path)
- miniCluster "org.apache.flink:flink-runtime-web_2.12:$flink_version"
implementation project(path: ":model:fn-execution", configuration: "shadow")
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":model:job-management", configuration:
"shadow")
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 18ece765af4..a9db234c45e 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -44,6 +44,7 @@ import
org.apache.beam.runners.flink.translation.functions.ImpulseSourceFunction
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import
org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.ProcessingTimeCallbackCompat;
import
org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
import
org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
import
org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator;
@@ -116,7 +117,6 @@ import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -1499,7 +1499,7 @@ class FlinkStreamingTransformTranslators {
static class UnboundedSourceWrapperNoValueWithRecordId<
OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
extends RichParallelSourceFunction<WindowedValue<OutputT>>
- implements ProcessingTimeCallback,
+ implements ProcessingTimeCallbackCompat,
BeamStoppableFunction,
CheckpointListener,
CheckpointedFunction {
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index b1016f073d6..611866e7741 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -118,7 +118,6 @@ import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.OutputTag;
@@ -896,7 +895,7 @@ public class DoFnOperator<InputT, OutputT>
}
@SuppressWarnings("FutureReturnValueIgnored")
- protected void scheduleForCurrentProcessingTime(ProcessingTimeCallback
callback) {
+ protected void scheduleForCurrentProcessingTime(ProcessingTimeCallbackCompat
callback) {
// We are scheduling a timer for advancing the watermark, to not delay
finishing the bundle
// and temporarily release the checkpoint lock. Otherwise, we could
potentially loop when a
// timer keeps scheduling a timer for the same timestamp.
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 1527335b10a..5b6704ed35b 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -28,6 +28,7 @@ import
org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.utils.Workarounds;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.ProcessingTimeCallbackCompat;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
@@ -55,7 +56,6 @@ import
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,7 +67,7 @@ import org.slf4j.LoggerFactory;
})
public class UnboundedSourceWrapper<OutputT, CheckpointMarkT extends
UnboundedSource.CheckpointMark>
extends
RichParallelSourceFunction<WindowedValue<ValueWithRecordId<OutputT>>>
- implements ProcessingTimeCallback,
+ implements ProcessingTimeCallbackCompat,
BeamStoppableFunction,
CheckpointListener,
CheckpointedFunction {
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRequiresStableInputTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRequiresStableInputTest.java
index 406c6fd91be..b8f384502e1 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRequiresStableInputTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRequiresStableInputTest.java
@@ -188,7 +188,7 @@ public class FlinkRequiresStableInputTest {
// try multiple times because the job might not be ready yet
for (int i = 0; i < 10; i++) {
try {
- return flinkCluster.triggerSavepoint(jobID, null, false).get();
+ return MiniClusterCompat.triggerSavepoint(flinkCluster, jobID, null,
false).get();
} catch (Exception e) {
exception = e;
Thread.sleep(100);
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
index a5949a6965f..54a5cb2b11c 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
@@ -274,7 +274,7 @@ public class FlinkSavepointTest implements Serializable {
// try multiple times because the job might not be ready yet
for (int i = 0; i < 10; i++) {
try {
- return flinkCluster.triggerSavepoint(jobID, null, false).get();
+ return MiniClusterCompat.triggerSavepoint(flinkCluster, jobID, null,
false).get();
} catch (Exception e) {
exception = e;
LOG.debug("Exception while triggerSavepoint, trying again", e);
diff --git a/sdks/python/apache_beam/options/pipeline_options.py
b/sdks/python/apache_beam/options/pipeline_options.py
index 5aa29c0fd96..4c00285c808 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -1358,7 +1358,7 @@ class JobServerOptions(PipelineOptions):
class FlinkRunnerOptions(PipelineOptions):
# These should stay in sync with gradle.properties.
- PUBLISHED_FLINK_VERSIONS = ['1.12', '1.13', '1.14']
+ PUBLISHED_FLINK_VERSIONS = ['1.12', '1.13', '1.14', '1.15']
@classmethod
def _add_argparse_args(cls, parser):
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 7889afe0526..7cc83b9698b 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -84,6 +84,10 @@ include(":runners:flink:1.13:job-server-container")
include(":runners:flink:1.14")
include(":runners:flink:1.14:job-server")
include(":runners:flink:1.14:job-server-container")
+// Flink 1.15
+include(":runners:flink:1.15")
+include(":runners:flink:1.15:job-server")
+include(":runners:flink:1.15:job-server-container")
/* End Flink Runner related settings */
include(":runners:twister2")
include(":runners:google-cloud-dataflow-java")