This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f2aa7912401 Exercise Python PVR tests on Flink 2.0 (#37313)
f2aa7912401 is described below

commit f2aa79124012a3324f0813cc80f8e99dfb7546bb
Author: Yi Hu <[email protected]>
AuthorDate: Tue Feb 17 11:19:58 2026 -0500

    Exercise Python PVR tests on Flink 2.0 (#37313)
    
    * Exercise Flink 2.0 Python Validates Runner tests
    
    * clean up TODOs: Move Flink 2.0 as latestFlinkVersion
    
    * Fix PortableJar PostCommit
    
    * Still run Go VR on Flink 1.20
---
 .../trigger_files/beam_PostCommit_Go_VR_Flink.json |  2 +-
 .../beam_PostCommit_PortableJar_Flink.json         |  1 +
 .../beam_PostCommit_Python_Examples_Flink.json     |  3 +
 .../beam_PostCommit_Python_Portable_Flink.yml      |  5 +-
 ...eam_PostCommit_Python_ValidatesRunner_Flink.yml |  2 +-
 .../workflows/beam_PreCommit_Python_PVR_Flink.yml  |  1 +
 CHANGES.md                                         |  3 +-
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |  3 +-
 .../FlinkStreamingPortablePipelineTranslator.java  | 21 +++++--
 .../FlinkStreamingPortablePipelineTranslator.java  | 21 +++++--
 sdks/go/test/build.gradle                          | 17 +++++-
 sdks/go/test/run_validatesrunner_tests.sh          |  6 +-
 .../extensions/sql/expansion-service/build.gradle  |  1 -
 .../runners/portability/flink_runner_test.py       | 29 ++++++++--
 sdks/python/test-suites/portable/common.gradle     | 64 ++++++++++------------
 15 files changed, 118 insertions(+), 61 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json 
b/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json
index d5ac7fc60d7..83b506b55ed 100644
--- a/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json
+++ b/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json
@@ -1,6 +1,6 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-  "modification": 1,
+  "modification": 2,
   "https://github.com/apache/beam/pull/32440": "testing datastream 
optimizations",
   "https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 
support"
 }
diff --git a/.github/trigger_files/beam_PostCommit_PortableJar_Flink.json 
b/.github/trigger_files/beam_PostCommit_PortableJar_Flink.json
new file mode 100644
index 00000000000..0967ef424bc
--- /dev/null
+++ b/.github/trigger_files/beam_PostCommit_PortableJar_Flink.json
@@ -0,0 +1 @@
+{}
diff --git a/.github/trigger_files/beam_PostCommit_Python_Examples_Flink.json 
b/.github/trigger_files/beam_PostCommit_Python_Examples_Flink.json
new file mode 100644
index 00000000000..9f5479a1277
--- /dev/null
+++ b/.github/trigger_files/beam_PostCommit_Python_Examples_Flink.json
@@ -0,0 +1,3 @@
+{
+  "modification": "#37313"
+}
diff --git a/.github/workflows/beam_PostCommit_Python_Portable_Flink.yml 
b/.github/workflows/beam_PostCommit_Python_Portable_Flink.yml
index f3c032ebffe..aaf1dd51e26 100644
--- a/.github/workflows/beam_PostCommit_Python_Portable_Flink.yml
+++ b/.github/workflows/beam_PostCommit_Python_Portable_Flink.yml
@@ -63,8 +63,9 @@ jobs:
         job_name: ["beam_PostCommit_Python_Portable_Flink"]
         job_phrase: ["Run Python Portable Flink"]
         # TODO: Enable PROCESS https://github.com/apache/beam/issues/35702
-        # environment_type: ['DOCKER', 'LOOPBACK', 'PROCESS']
-        environment_type: ['DOCKER', 'LOOPBACK']
+        # all environment_type: ['DOCKER', 'LOOPBACK', 'PROCESS']
+        # Run modes not covered by PreCommit_Python_PVR_Flink (i.e. other than 
'LOOPBACK')
+        environment_type: ['DOCKER']
     steps:
       - uses: actions/checkout@v4
       - name: Setup repository
diff --git a/.github/workflows/beam_PostCommit_Python_ValidatesRunner_Flink.yml 
b/.github/workflows/beam_PostCommit_Python_ValidatesRunner_Flink.yml
index 51006c079b7..15f4cbc0a8c 100644
--- a/.github/workflows/beam_PostCommit_Python_ValidatesRunner_Flink.yml
+++ b/.github/workflows/beam_PostCommit_Python_ValidatesRunner_Flink.yml
@@ -88,7 +88,7 @@ jobs:
           CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}}
         uses: ./.github/actions/gradle-command-self-hosted-action
         with:
-          gradle-command: 
:sdks:python:test-suites:portable:py${{steps.set_py_ver_clean.outputs.py_ver_clean}}:flinkValidatesRunner
+          gradle-command: 
:sdks:python:test-suites:portable:py${{steps.set_py_ver_clean.outputs.py_ver_clean}}:flink1ValidatesRunner
           arguments: |
             -PpythonVersion=${{ matrix.python_version }} \
       - name: Archive Python Test Results
diff --git a/.github/workflows/beam_PreCommit_Python_PVR_Flink.yml 
b/.github/workflows/beam_PreCommit_Python_PVR_Flink.yml
index 588605aa2c2..05603cb6a21 100644
--- a/.github/workflows/beam_PreCommit_Python_PVR_Flink.yml
+++ b/.github/workflows/beam_PreCommit_Python_PVR_Flink.yml
@@ -106,6 +106,7 @@ jobs:
         env:
           CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}}
         with:
+          # Run Flink 2 tests. Flink 1.20 is covered by 
PostCommit_Python_ValidatesRunner_Flink
           gradle-command: 
:sdks:python:test-suites:portable:py313:flinkValidatesRunner
           arguments: |
             -PpythonVersion=3.13 \
diff --git a/CHANGES.md b/CHANGES.md
index 3bd266af793..bd24be1989d 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -61,8 +61,7 @@
 
 * New highly anticipated feature X added to Python SDK 
([#X](https://github.com/apache/beam/issues/X)).
 * New highly anticipated feature Y added to Java SDK 
([#Y](https://github.com/apache/beam/issues/Y)).
-* Flink 2.0 support for Java Classic and Portable Flink Runners 
([#36947](https://github.com/apache/beam/issues/36947)),
-  experimental support for other SDK languages including Python.
+* Flink 2.0 support ([#36947](https://github.com/apache/beam/issues/36947)).
 
 
 ## I/Os
diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 94eebb7060a..aad671d22ce 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -552,8 +552,7 @@ class BeamModulePlugin implements Plugin<Project> {
     project.ext.currentJavaVersion = getSupportedJavaVersion()
 
     project.ext.allFlinkVersions = project.flink_versions.split(',')
-    // TODO(https://github.com/apache/beam/issues/36947): Move to use 
project.ext.allFlinkVersions.last() when Flink 2 support completed
-    project.ext.latestFlinkVersion = '1.20'
+    project.ext.latestFlinkVersion = project.ext.allFlinkVersions.last()
 
     project.ext.nativeArchitecture = {
       // Best guess as to this system's normalized native architecture name.
diff --git 
a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 
b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index d68acdb6863..e8929b84593 100644
--- 
a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ 
b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -100,6 +100,7 @@ import org.apache.beam.sdk.values.WindowedValues;
 import org.apache.beam.sdk.values.WindowedValues.WindowedValueCoder;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import 
org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashMultiset;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -987,8 +988,11 @@ public class FlinkStreamingPortablePipelineTranslator
       // stage
       String sideInputTag = sideInputId.getLocalName();
       String collectionId =
-          components
-              .getTransformsOrThrow(sideInputId.getTransformId())
+          MoreObjects.firstNonNull(
+                  
components.getTransformsOrDefault(sideInputId.getTransformId(), null),
+                  // In the case of optimized pipeline, side input transform 
may not be found in
+                  // component proto
+                  
stagePayload.getComponents().getTransformsOrThrow(sideInputId.getTransformId()))
               .getInputsOrThrow(sideInputId.getLocalName());
       RunnerApi.WindowingStrategy windowingStrategyProto =
           components.getWindowingStrategiesOrThrow(
@@ -1045,8 +1049,11 @@ public class FlinkStreamingPortablePipelineTranslator
       tagToIntMapping.put(tag, count);
       count++;
       String collectionId =
-          components
-              .getTransformsOrThrow(sideInput.getKey().getTransformId())
+          MoreObjects.firstNonNull(
+                  
components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null),
+                  stagePayload
+                      .getComponents()
+                      
.getTransformsOrThrow(sideInput.getKey().getTransformId()))
               .getInputsOrThrow(sideInput.getKey().getLocalName());
       DataStream<Object> sideInputStream = 
context.getDataStreamOrThrow(collectionId);
       TypeInformation<Object> tpe = sideInputStream.getType();
@@ -1078,7 +1085,11 @@ public class FlinkStreamingPortablePipelineTranslator
       TupleTag<?> tag = sideInput.getValue().getTagInternal();
       final int intTag = tagToIntMapping.get(tag);
       RunnerApi.PTransform pTransform =
-          components.getTransformsOrThrow(sideInput.getKey().getTransformId());
+          MoreObjects.firstNonNull(
+              
components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null),
+              stagePayload
+                  .getComponents()
+                  .getTransformsOrThrow(sideInput.getKey().getTransformId()));
       String collectionId = 
pTransform.getInputsOrThrow(sideInput.getKey().getLocalName());
       DataStream<WindowedValue<?>> sideInputStream = 
context.getDataStreamOrThrow(collectionId);
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index cad90de8cee..caa5a1788c8 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -100,6 +100,7 @@ import org.apache.beam.sdk.values.WindowedValues;
 import org.apache.beam.sdk.values.WindowedValues.WindowedValueCoder;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import 
org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashMultiset;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -986,8 +987,11 @@ public class FlinkStreamingPortablePipelineTranslator
       // stage
       String sideInputTag = sideInputId.getLocalName();
       String collectionId =
-          components
-              .getTransformsOrThrow(sideInputId.getTransformId())
+          MoreObjects.firstNonNull(
+                  
components.getTransformsOrDefault(sideInputId.getTransformId(), null),
+                  // In the case of optimized pipeline, side input transform 
may not be found in
+                  // component proto
+                  
stagePayload.getComponents().getTransformsOrThrow(sideInputId.getTransformId()))
               .getInputsOrThrow(sideInputId.getLocalName());
       RunnerApi.WindowingStrategy windowingStrategyProto =
           components.getWindowingStrategiesOrThrow(
@@ -1044,8 +1048,11 @@ public class FlinkStreamingPortablePipelineTranslator
       tagToIntMapping.put(tag, count);
       count++;
       String collectionId =
-          components
-              .getTransformsOrThrow(sideInput.getKey().getTransformId())
+          MoreObjects.firstNonNull(
+                  
components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null),
+                  stagePayload
+                      .getComponents()
+                      
.getTransformsOrThrow(sideInput.getKey().getTransformId()))
               .getInputsOrThrow(sideInput.getKey().getLocalName());
       DataStream<Object> sideInputStream = 
context.getDataStreamOrThrow(collectionId);
       TypeInformation<Object> tpe = sideInputStream.getType();
@@ -1077,7 +1084,11 @@ public class FlinkStreamingPortablePipelineTranslator
       TupleTag<?> tag = sideInput.getValue().getTagInternal();
       final int intTag = tagToIntMapping.get(tag);
       RunnerApi.PTransform pTransform =
-          components.getTransformsOrThrow(sideInput.getKey().getTransformId());
+          MoreObjects.firstNonNull(
+              
components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null),
+              stagePayload
+                  .getComponents()
+                  .getTransformsOrThrow(sideInput.getKey().getTransformId()));
       String collectionId = 
pTransform.getInputsOrThrow(sideInput.getKey().getLocalName());
       DataStream<WindowedValue<?>> sideInputStream = 
context.getDataStreamOrThrow(collectionId);
 
diff --git a/sdks/go/test/build.gradle b/sdks/go/test/build.gradle
index 5576c40c0aa..424b009fd12 100644
--- a/sdks/go/test/build.gradle
+++ b/sdks/go/test/build.gradle
@@ -79,18 +79,31 @@ task dataflowValidatesRunnerARM64() {
 task flinkValidatesRunner {
   group = "Verification"
 
+  // TODO(https://github.com/apache/beam/issues/37600) use 
project.ext.latestFlinkVersion after resolved
+  def flinkVersion = '1.20'
+
   dependsOn ":sdks:go:test:goBuild"
   dependsOn ":sdks:go:container:docker"
   dependsOn ":sdks:java:container:${project.ext.currentJavaVersion}:docker"
-  dependsOn 
":runners:flink:${project.ext.latestFlinkVersion}:job-server:shadowJar"
+  dependsOn ":runners:flink:${flinkVersion}:job-server:shadowJar"
   dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar"
+  doFirst {
+    // Copy Flink conf file
+    copy {
+      from 
"${project.rootDir}/runners/flink/2.0/src/test/resources/flink-test-config.yaml"
+      into "${project.buildDir}/flink-conf"
+
+      // Rename the file during the copy process
+      rename 'flink-test-config.yaml', 'config.yaml'
+    }
+  }
   doLast {
     def pipelineOptions = [  // Pipeline options piped directly to Go SDK 
flags.
         
"--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
     ]
     def options = [
         "--runner flink",
-        "--flink_job_server_jar 
${project(":runners:flink:${project.ext.latestFlinkVersion}:job-server").shadowJar.archivePath}",
+        "--flink_job_server_jar 
${project(":runners:flink:${flinkVersion}:job-server").shadowJar.archivePath}",
         "--pipeline_opts \"${pipelineOptions.join(' ')}\"",
     ]
     exec {
diff --git a/sdks/go/test/run_validatesrunner_tests.sh 
b/sdks/go/test/run_validatesrunner_tests.sh
index be7a795f01a..972caef6a33 100755
--- a/sdks/go/test/run_validatesrunner_tests.sh
+++ b/sdks/go/test/run_validatesrunner_tests.sh
@@ -273,9 +273,13 @@ if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || 
"$RUNNER" == "samza" || "$
     echo "No endpoint specified; starting a new $RUNNER job server on 
$ENDPOINT"
     if [[ "$RUNNER" == "flink" ]]; then
       "$JAVA_CMD" \
+          -Dslf4j.provider=org.slf4j.simple.SimpleServiceProvider \
+          -Dorg.slf4j.simpleLogger.log.org.apache.flink.metrics=error \
+          -Dorg.slf4j.simpleLogger.log.org.apache.flink.runtime=error \
+          -Dorg.slf4j.simpleLogger.log.org.apache.flink.streaming=error \
           -jar $FLINK_JOB_SERVER_JAR \
           --flink-master [local] \
-          --flink-conf-dir 
$CURRENT_DIRECTORY/../../../runners/flink/src/test/resources \
+          --flink-conf-dir $CURRENT_DIRECTORY/build/flink-conf/ \
           --job-port $JOB_PORT \
           --expansion-port 0 \
           --artifact-port 0 &
diff --git a/sdks/java/extensions/sql/expansion-service/build.gradle 
b/sdks/java/extensions/sql/expansion-service/build.gradle
index 8b5bd8c6924..562c1ac8dc7 100644
--- a/sdks/java/extensions/sql/expansion-service/build.gradle
+++ b/sdks/java/extensions/sql/expansion-service/build.gradle
@@ -56,5 +56,4 @@ shadowJar {
   manifest {
     attributes(["Multi-Release": true])
   }
-  outputs.upToDateWhen { false }
 }
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py 
b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index dbeef557ab5..096645833ae 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -18,6 +18,7 @@
 
 import argparse
 import logging
+import platform
 import shlex
 import typing
 import unittest
@@ -139,6 +140,7 @@ class 
FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
       cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt')
 
       # path to write Flink configuration to
+      # Flink 1.x conf:
       conf_path = path.join(cls.conf_dir, 'flink-conf.yaml')
       file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter'
       with open(conf_path, 'w') as f:
@@ -149,6 +151,19 @@ class 
FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
                 'metrics.reporter.file.path: %s' % cls.test_metrics_path,
                 'metrics.scope.operator: <operator_name>',
             ]))
+      # Flink 2.x conf:
+      conf_path_2 = path.join(cls.conf_dir, 'config.yaml')
+      with open(conf_path_2, 'w') as f:
+        f.write(
+            '''metrics:
+  reporters: file
+  reporter:
+    file:
+      class: %s
+      path: %s
+  scope:
+    operator: <operator_name>
+''' % (file_reporter, cls.test_metrics_path))
 
   @classmethod
   def _subprocess_command(cls, job_port, expansion_port):
@@ -158,15 +173,19 @@ class 
FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
 
     cls._create_conf_dir()
     cls.expansion_port = expansion_port
-
+    platform_specific_opts = []
+    if platform.system() == 'Linux':
+      # UseContainerSupport is supported in Linux and turned on by default
+      platform_specific_opts.append('-XX:-UseContainerSupport')
     try:
-      return [
-          'java',
-          '-XX:-UseContainerSupport',
+      return ['java'] + platform_specific_opts + [
           '--add-opens=java.base/java.lang=ALL-UNNAMED',
           '--add-opens=java.base/java.nio=ALL-UNNAMED',
           '--add-opens=java.base/java.util=ALL-UNNAMED',
-          '-Dorg.slf4j.simpleLogger.defaultLogLevel=warn',
+          '-Dslf4j.provider=org.slf4j.simple.SimpleServiceProvider',
+          '-Dorg.slf4j.simpleLogger.log.org.apache.flink.metrics=error',
+          '-Dorg.slf4j.simpleLogger.log.org.apache.flink.runtime=error',
+          '-Dorg.slf4j.simpleLogger.log.org.apache.flink.streaming=error',
           '-jar',
           cls.flink_job_server_jar,
           '--flink-master',
diff --git a/sdks/python/test-suites/portable/common.gradle 
b/sdks/python/test-suites/portable/common.gradle
index f7fa3e8e0b2..843d32705c2 100644
--- a/sdks/python/test-suites/portable/common.gradle
+++ b/sdks/python/test-suites/portable/common.gradle
@@ -23,24 +23,32 @@ import org.apache.tools.ant.taskdefs.condition.Os
 def pythonRootDir = "${rootDir}/sdks/python"
 def pythonVersionSuffix = project.ext.pythonVersion.replace('.', '')
 def latestFlinkVersion = project.ext.latestFlinkVersion
+// TODO(https://github.com/apache/beam/issues/36947): Remove when dropping 
Flink 1.x support
+def latestFlink1Version = '1.20'
 def currentJavaVersion = project.ext.currentJavaVersion
 
 ext {
   pythonContainerTask = 
":sdks:python:container:py${pythonVersionSuffix}:docker"
 }
 
-def createFlinkRunnerTestTask(String workerType) {
-  def taskName = "flinkCompatibilityMatrix${workerType}"
-  // 
project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath
 is not resolvable until runtime, so hard-code it here.
-  def jobServerJar = 
"${rootDir}/runners/flink/${latestFlinkVersion}/job-server/build/libs/beam-runners-flink-${latestFlinkVersion}-job-server-${version}.jar"
+def createFlinkRunnerTestTask(String workerType, String flinkVersion) {
+  String taskName
+
+  // 
project(":runners:flink:${flinkVersion}:job-server").shadowJar.archivePath is 
not resolvable until runtime, so hard-code it here.
+  def jobServerJar = 
"${rootDir}/runners/flink/${flinkVersion}/job-server/build/libs/beam-runners-flink-${flinkVersion}-job-server-${version}.jar"
   def options = "--flink_job_server_jar=${jobServerJar} 
--environment_type=${workerType}"
+  if (flinkVersion.startsWith('1')) {
+    taskName = "flink1CompatibilityMatrix${workerType}"
+  } else {
+    taskName = "flinkCompatibilityMatrix${workerType}"
+  }
   if (workerType == 'PROCESS') {
     options += " 
--environment_options=process_command=${buildDir.absolutePath}/sdk_worker.sh"
   }
   def task = toxTask(taskName, 'flink-runner-test', options)
   // Through the Flink job server, we transitively add dependencies on the 
expansion services needed in tests.
   task.configure {
-    dependsOn ":runners:flink:${latestFlinkVersion}:job-server:shadowJar"
+    dependsOn ":runners:flink:${flinkVersion}:job-server:shadowJar"
     // The Java SDK worker is required to execute external transforms.
     def suffix = getSupportedJavaVersion()
     dependsOn ":sdks:java:container:${suffix}:docker"
@@ -53,31 +61,19 @@ def createFlinkRunnerTestTask(String workerType) {
   return task
 }
 
-createFlinkRunnerTestTask('DOCKER')
-createFlinkRunnerTestTask('PROCESS')
-createFlinkRunnerTestTask('LOOPBACK')
+createFlinkRunnerTestTask('DOCKER', latestFlinkVersion)
+createFlinkRunnerTestTask('PROCESS', latestFlinkVersion)
+createFlinkRunnerTestTask('LOOPBACK', latestFlinkVersion)
+createFlinkRunnerTestTask('DOCKER', latestFlink1Version)
+createFlinkRunnerTestTask('PROCESS', latestFlink1Version)
+createFlinkRunnerTestTask('LOOPBACK', latestFlink1Version)
 
-task flinkValidatesRunner() {
-  dependsOn 'flinkCompatibilityMatrixLOOPBACK'
+task flink1ValidatesRunner() {
+  dependsOn 'flink1CompatibilityMatrixLOOPBACK'
 }
 
-// TODO(https://github.com/apache/beam/issues/19962): Enable on pre-commit.
-tasks.register("flinkTriggerTranscript") {
-  dependsOn 'setupVirtualenv'
-  dependsOn ":runners:flink:${latestFlinkVersion}:job-server:shadowJar"
-  doLast {
-    exec {
-      executable 'sh'
-      args '-c', """
-          . ${envdir}/bin/activate \\
-          && cd ${pythonRootDir} \\
-          && pip install -e .[test] \\
-          && pytest \\
-              
apache_beam/transforms/trigger_test.py::WeakTestStreamTranscriptTest \\
-              --test-pipeline-options='--runner=FlinkRunner 
--environment_type=LOOPBACK 
--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server:").shadowJar.archivePath}'
-          """
-    }
-  }
+task flinkValidatesRunner() {
+  dependsOn 'flinkCompatibilityMatrixLOOPBACK'
 }
 
 // Verifies BEAM-10702.
@@ -276,7 +272,7 @@ project.tasks.register("flinkExamples") {
   dependsOn = [
           'setupVirtualenv',
           'installGcpTest',
-          ":runners:flink:${latestFlinkVersion}:job-server:shadowJar"
+          ":runners:flink:${latestFlink1Version}:job-server:shadowJar"
   ]
   doLast {
     def testOpts = [
@@ -288,7 +284,7 @@ project.tasks.register("flinkExamples") {
             "--project=apache-beam-testing",
             "--environment_type=LOOPBACK",
             "--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it",
-            
"--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath}",
+            
"--flink_job_server_jar=${project(":runners:flink:${latestFlink1Version}:job-server").shadowJar.archivePath}",
             "--flink_conf_dir=${flink_conf_dir}",
             '--sdk_harness_log_level_overrides=' +
                 // suppress info level flink.runtime log flood
@@ -388,7 +384,7 @@ 
project.tasks.register("postCommitPy${pythonVersionSuffix}IT") {
   dependsOn = [
           'setupVirtualenv',
           'installGcpTest',
-          ":runners:flink:${latestFlinkVersion}:job-server:shadowJar",
+          ":runners:flink:${latestFlink1Version}:job-server:shadowJar",
           ":sdks:java:container:${fork_java_version}:docker",
           ':sdks:java:testing:kafka-service:buildTestKafkaServiceJar',
           ':sdks:java:io:expansion-service:shadowJar',
@@ -412,7 +408,7 @@ 
project.tasks.register("postCommitPy${pythonVersionSuffix}IT") {
         "--project=apache-beam-testing",
         "--environment_type=LOOPBACK",
         "--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it",
-        
"--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath}",
+        
"--flink_job_server_jar=${project(":runners:flink:${latestFlink1Version}:job-server").shadowJar.archivePath}",
         "--flink_conf_dir=${flink_conf_dir}",
         '--sdk_harness_log_level_overrides=' +
             // suppress info level flink.runtime log flood
@@ -444,7 +440,7 @@ project.tasks.register("xlangSpannerIOIT") {
   dependsOn = [
           'setupVirtualenv',
           'installGcpTest',
-          ":runners:flink:${latestFlinkVersion}:job-server:shadowJar",
+          ":runners:flink:${latestFlink1Version}:job-server:shadowJar",
           ":sdks:java:container:${currentJavaVersion}:docker",
           ':sdks:java:io:expansion-service:shadowJar',
           ':sdks:java:io:google-cloud-platform:expansion-service:shadowJar',
@@ -463,7 +459,7 @@ project.tasks.register("xlangSpannerIOIT") {
         "--project=apache-beam-testing",
         "--environment_type=LOOPBACK",
         "--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it",
-        
"--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath}",
+        
"--flink_job_server_jar=${project(":runners:flink:${latestFlink1Version}:job-server").shadowJar.archivePath}",
         '--sdk_harness_log_level_overrides=' +
             // suppress info level flink.runtime log flood
             '{\\"org.apache.flink.runtime\\":\\"WARN\\",' +
@@ -508,7 +504,7 @@ def addTestJavaJarCreator(String runner, Task 
jobServerJarTask) {
 }
 
 // TODO(BEAM-11333) Update and test multiple Flink versions.
-addTestJavaJarCreator("FlinkRunner", 
tasks.getByPath(":runners:flink:${latestFlinkVersion}:job-server:shadowJar"))
+addTestJavaJarCreator("FlinkRunner", 
tasks.getByPath(":runners:flink:${latestFlink1Version}:job-server:shadowJar"))
 addTestJavaJarCreator("SparkRunner", 
tasks.getByPath(":runners:spark:3:job-server:shadowJar"))
 
 def addTestFlinkUberJar(boolean saveMainSession) {

Reply via email to