This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new de71d132f6a [Beam#25046]add support for flink 1.16 for Beam Flink
Runner (#25584)
de71d132f6a is described below
commit de71d132f6accad611b00b89ca56ed6f4b7b85ca
Author: Yanan Hao <[email protected]>
AuthorDate: Fri Feb 24 09:07:56 2023 -0800
[Beam#25046]add support for flink 1.16 for Beam Flink Runner (#25584)
---
build.gradle.kts | 3 ++
gradle.properties | 2 +-
runners/flink/1.16/build.gradle | 34 ++++++++++++++++++++++
.../flink/1.16/job-server-container/build.gradle | 26 +++++++++++++++++
runners/flink/1.16/job-server/build.gradle | 31 ++++++++++++++++++++
.../runners/flink/ReadSourceStreamingTest.java | 3 +-
.../apache/beam/runners/flink/ReadSourceTest.java | 3 +-
.../flink/streaming/GroupByNullKeyTest.java | 3 +-
.../flink/streaming/TopWikipediaSessionsTest.java | 3 +-
.../python/apache_beam/options/pipeline_options.py | 2 +-
settings.gradle.kts | 4 +++
11 files changed, 108 insertions(+), 6 deletions(-)
diff --git a/build.gradle.kts b/build.gradle.kts
index e715ce661fa..7512ee25604 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -242,6 +242,8 @@ tasks.register("javaPreCommit") {
dependsOn(":runners:flink:1.14:job-server:build")
dependsOn(":runners:flink:1.15:build")
dependsOn(":runners:flink:1.15:job-server:build")
+ dependsOn(":runners:flink:1.16:build")
+ dependsOn(":runners:flink:1.16:job-server:build")
dependsOn(":runners:google-cloud-dataflow-java:build")
dependsOn(":runners:google-cloud-dataflow-java:examples-streaming:build")
dependsOn(":runners:google-cloud-dataflow-java:examples:build")
@@ -339,6 +341,7 @@ tasks.register("javaPostCommitSickbay") {
dependsOn(":runners:flink:1.13:validatesRunnerSickbay")
dependsOn(":runners:flink:1.14:validatesRunnerSickbay")
dependsOn(":runners:flink:1.15:validatesRunnerSickbay")
+ dependsOn(":runners:flink:1.16:validatesRunnerSickbay")
dependsOn(":runners:spark:3:job-server:validatesRunnerSickbay")
dependsOn(":runners:direct-java:validatesRunnerSickbay")
dependsOn(":runners:portability:java:validatesRunnerSickbay")
diff --git a/gradle.properties b/gradle.properties
index 1b0381592e0..ce0cfa4a751 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -38,5 +38,5 @@ javaVersion=1.8
docker_image_default_repo_root=apache
docker_image_default_repo_prefix=beam_
-flink_versions=1.12,1.13,1.14,1.15
+flink_versions=1.12,1.13,1.14,1.15,1.16
diff --git a/runners/flink/1.16/build.gradle b/runners/flink/1.16/build.gradle
new file mode 100644
index 00000000000..772e190aa45
--- /dev/null
+++ b/runners/flink/1.16/build.gradle
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '..'
+
+/* All properties required for loading the Flink build script */
+project.ext {
+ // Set the version of all Flink-related dependencies here.
+ flink_version = '1.16.0'
+ // Version specific code overrides.
+ main_source_overrides = ["${basePath}/1.12/src/main/java",
"${basePath}/1.13/src/main/java", "${basePath}/1.14/src/main/java",
"${basePath}/1.15/src/main/java", './src/main/java']
+ test_source_overrides = ["${basePath}/1.12/src/test/java",
"${basePath}/1.13/src/test/java", "${basePath}/1.14/src/test/java",
"${basePath}/1.15/src/test/java", './src/test/java']
+ main_resources_overrides = []
+ test_resources_overrides = []
+ archives_base_name = 'beam-runners-flink-1.16'
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_runner.gradle"
diff --git a/runners/flink/1.16/job-server-container/build.gradle
b/runners/flink/1.16/job-server-container/build.gradle
new file mode 100644
index 00000000000..afdb68a0fc9
--- /dev/null
+++ b/runners/flink/1.16/job-server-container/build.gradle
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '../../job-server-container'
+
+project.ext {
+ resource_path = basePath
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_job_server_container.gradle"
diff --git a/runners/flink/1.16/job-server/build.gradle
b/runners/flink/1.16/job-server/build.gradle
new file mode 100644
index 00000000000..99dc00275a0
--- /dev/null
+++ b/runners/flink/1.16/job-server/build.gradle
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '../../job-server'
+
+project.ext {
+ // Look for the source code in the parent module
+ main_source_dirs = ["$basePath/src/main/java"]
+ test_source_dirs = ["$basePath/src/test/java"]
+ main_resources_dirs = ["$basePath/src/main/resources"]
+ test_resources_dirs = ["$basePath/src/test/resources"]
+ archives_base_name = 'beam-runners-flink-1.16-job-server'
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_job_server.gradle"
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
index 5f2434d7a25..2921065c154 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -51,7 +52,7 @@ public class ReadSourceStreamingTest extends AbstractTestBase
{
@After
public void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT),
resultDir);
+
TestBaseUtils.compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT),
resultDir);
}
@Test
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceTest.java
index 96d45ddcf1b..2974780f049 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceTest.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
/** Reads from a bounded source in batch execution. */
public class ReadSourceTest extends JavaProgramTestBase {
@@ -52,7 +53,7 @@ public class ReadSourceTest extends JavaProgramTestBase {
@Override
protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT),
resultPath);
+
TestBaseUtils.compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT),
resultPath);
}
@Override
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index 6d089062158..e3f3fda4646 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.After;
@@ -63,7 +64,7 @@ public class GroupByNullKeyTest extends AbstractTestBase
implements Serializable
@After
public void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT),
resultDir);
+
TestBaseUtils.compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT),
resultDir);
}
/** DoFn extracting user and timestamp. */
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
index 63abfa5b618..faa35ca4e0a 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.After;
@@ -69,7 +70,7 @@ public class TopWikipediaSessionsTest extends
AbstractTestBase implements Serial
@After
public void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT),
resultDir);
+
TestBaseUtils.compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT),
resultDir);
}
@Test
diff --git a/sdks/python/apache_beam/options/pipeline_options.py
b/sdks/python/apache_beam/options/pipeline_options.py
index 37bdb106038..ee0b1095fa2 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -1463,7 +1463,7 @@ class JobServerOptions(PipelineOptions):
class FlinkRunnerOptions(PipelineOptions):
# These should stay in sync with gradle.properties.
- PUBLISHED_FLINK_VERSIONS = ['1.12', '1.13', '1.14', '1.15']
+ PUBLISHED_FLINK_VERSIONS = ['1.12', '1.13', '1.14', '1.15', '1.16']
@classmethod
def _add_argparse_args(cls, parser):
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 3c19893774e..ff8ed38db00 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -98,6 +98,10 @@ include(":runners:flink:1.14:job-server-container")
include(":runners:flink:1.15")
include(":runners:flink:1.15:job-server")
include(":runners:flink:1.15:job-server-container")
+// Flink 1.16
+include(":runners:flink:1.16")
+include(":runners:flink:1.16:job-server")
+include(":runners:flink:1.16:job-server-container")
/* End Flink Runner related settings */
include(":runners:twister2")
include(":runners:google-cloud-dataflow-java")