This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new de71d132f6a [Beam#25046]add support for flink 1.16 for Beam Flink 
Runner (#25584)
de71d132f6a is described below

commit de71d132f6accad611b00b89ca56ed6f4b7b85ca
Author: Yanan Hao <[email protected]>
AuthorDate: Fri Feb 24 09:07:56 2023 -0800

    [Beam#25046]add support for flink 1.16 for Beam Flink Runner (#25584)
---
 build.gradle.kts                                   |  3 ++
 gradle.properties                                  |  2 +-
 runners/flink/1.16/build.gradle                    | 34 ++++++++++++++++++++++
 .../flink/1.16/job-server-container/build.gradle   | 26 +++++++++++++++++
 runners/flink/1.16/job-server/build.gradle         | 31 ++++++++++++++++++++
 .../runners/flink/ReadSourceStreamingTest.java     |  3 +-
 .../apache/beam/runners/flink/ReadSourceTest.java  |  3 +-
 .../flink/streaming/GroupByNullKeyTest.java        |  3 +-
 .../flink/streaming/TopWikipediaSessionsTest.java  |  3 +-
 .../python/apache_beam/options/pipeline_options.py |  2 +-
 settings.gradle.kts                                |  4 +++
 11 files changed, 108 insertions(+), 6 deletions(-)

diff --git a/build.gradle.kts b/build.gradle.kts
index e715ce661fa..7512ee25604 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -242,6 +242,8 @@ tasks.register("javaPreCommit") {
   dependsOn(":runners:flink:1.14:job-server:build")
   dependsOn(":runners:flink:1.15:build")
   dependsOn(":runners:flink:1.15:job-server:build")
+  dependsOn(":runners:flink:1.16:build")
+  dependsOn(":runners:flink:1.16:job-server:build")
   dependsOn(":runners:google-cloud-dataflow-java:build")
   dependsOn(":runners:google-cloud-dataflow-java:examples-streaming:build")
   dependsOn(":runners:google-cloud-dataflow-java:examples:build")
@@ -339,6 +341,7 @@ tasks.register("javaPostCommitSickbay") {
   dependsOn(":runners:flink:1.13:validatesRunnerSickbay")
   dependsOn(":runners:flink:1.14:validatesRunnerSickbay")
   dependsOn(":runners:flink:1.15:validatesRunnerSickbay")
+  dependsOn(":runners:flink:1.16:validatesRunnerSickbay")
   dependsOn(":runners:spark:3:job-server:validatesRunnerSickbay")
   dependsOn(":runners:direct-java:validatesRunnerSickbay")
   dependsOn(":runners:portability:java:validatesRunnerSickbay")
diff --git a/gradle.properties b/gradle.properties
index 1b0381592e0..ce0cfa4a751 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -38,5 +38,5 @@ javaVersion=1.8
 docker_image_default_repo_root=apache
 docker_image_default_repo_prefix=beam_
 
-flink_versions=1.12,1.13,1.14,1.15
+flink_versions=1.12,1.13,1.14,1.15,1.16
 
diff --git a/runners/flink/1.16/build.gradle b/runners/flink/1.16/build.gradle
new file mode 100644
index 00000000000..772e190aa45
--- /dev/null
+++ b/runners/flink/1.16/build.gradle
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '..'
+
+/* All properties required for loading the Flink build script */
+project.ext {
+  // Set the version of all Flink-related dependencies here.
+  flink_version = '1.16.0'
+  // Version specific code overrides.
+  main_source_overrides = ["${basePath}/1.12/src/main/java", 
"${basePath}/1.13/src/main/java", "${basePath}/1.14/src/main/java", 
"${basePath}/1.15/src/main/java", './src/main/java']
+  test_source_overrides = ["${basePath}/1.12/src/test/java", 
"${basePath}/1.13/src/test/java", "${basePath}/1.14/src/test/java", 
"${basePath}/1.15/src/test/java", './src/test/java']
+  main_resources_overrides = []
+  test_resources_overrides = []
+  archives_base_name = 'beam-runners-flink-1.16'
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_runner.gradle"
diff --git a/runners/flink/1.16/job-server-container/build.gradle 
b/runners/flink/1.16/job-server-container/build.gradle
new file mode 100644
index 00000000000..afdb68a0fc9
--- /dev/null
+++ b/runners/flink/1.16/job-server-container/build.gradle
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '../../job-server-container'
+
+project.ext {
+  resource_path = basePath
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_job_server_container.gradle"
diff --git a/runners/flink/1.16/job-server/build.gradle 
b/runners/flink/1.16/job-server/build.gradle
new file mode 100644
index 00000000000..99dc00275a0
--- /dev/null
+++ b/runners/flink/1.16/job-server/build.gradle
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '../../job-server'
+
+project.ext {
+  // Look for the source code in the parent module
+  main_source_dirs = ["$basePath/src/main/java"]
+  test_source_dirs = ["$basePath/src/test/java"]
+  main_resources_dirs = ["$basePath/src/main/resources"]
+  test_resources_dirs = ["$basePath/src/test/resources"]
+  archives_base_name = 'beam-runners-flink-1.16-job-server'
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_job_server.gradle"
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
index 5f2434d7a25..2921065c154 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
 import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -51,7 +52,7 @@ public class ReadSourceStreamingTest extends AbstractTestBase 
{
 
   @After
   public void postSubmit() throws Exception {
-    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultDir);
+    
TestBaseUtils.compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT),
 resultDir);
   }
 
   @Test
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceTest.java 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceTest.java
index 96d45ddcf1b..2974780f049 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceTest.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
 import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
 
 /** Reads from a bounded source in batch execution. */
 public class ReadSourceTest extends JavaProgramTestBase {
@@ -52,7 +53,7 @@ public class ReadSourceTest extends JavaProgramTestBase {
 
   @Override
   protected void postSubmit() throws Exception {
-    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
+    
TestBaseUtils.compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT),
 resultPath);
   }
 
   @Override
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index 6d089062158..e3f3fda4646 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
 import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.After;
@@ -63,7 +64,7 @@ public class GroupByNullKeyTest extends AbstractTestBase 
implements Serializable
 
   @After
   public void postSubmit() throws Exception {
-    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultDir);
+    
TestBaseUtils.compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT),
 resultDir);
   }
 
   /** DoFn extracting user and timestamp. */
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
index 63abfa5b618..faa35ca4e0a 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
 import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.After;
@@ -69,7 +70,7 @@ public class TopWikipediaSessionsTest extends 
AbstractTestBase implements Serial
 
   @After
   public void postSubmit() throws Exception {
-    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultDir);
+    
TestBaseUtils.compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT),
 resultDir);
   }
 
   @Test
diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index 37bdb106038..ee0b1095fa2 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -1463,7 +1463,7 @@ class JobServerOptions(PipelineOptions):
 class FlinkRunnerOptions(PipelineOptions):
 
   # These should stay in sync with gradle.properties.
-  PUBLISHED_FLINK_VERSIONS = ['1.12', '1.13', '1.14', '1.15']
+  PUBLISHED_FLINK_VERSIONS = ['1.12', '1.13', '1.14', '1.15', '1.16']
 
   @classmethod
   def _add_argparse_args(cls, parser):
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 3c19893774e..ff8ed38db00 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -98,6 +98,10 @@ include(":runners:flink:1.14:job-server-container")
 include(":runners:flink:1.15")
 include(":runners:flink:1.15:job-server")
 include(":runners:flink:1.15:job-server-container")
+// Flink 1.16
+include(":runners:flink:1.16")
+include(":runners:flink:1.16:job-server")
+include(":runners:flink:1.16:job-server-container")
 /* End Flink Runner related settings */
 include(":runners:twister2")
 include(":runners:google-cloud-dataflow-java")

Reply via email to