This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f2aa7912401 Exercise Python PVR tests on Flink 2.0 (#37313)
f2aa7912401 is described below
commit f2aa79124012a3324f0813cc80f8e99dfb7546bb
Author: Yi Hu <[email protected]>
AuthorDate: Tue Feb 17 11:19:58 2026 -0500
Exercise Python PVR tests on Flink 2.0 (#37313)
* Exercise Flink 2.0 Python Validates Runner tests
* clean up TODOs: Move Flink 2.0 as latestFlinkVersion
* Fix PortableJar PostCommit
* Still run Go VR on Flink 1.20
---
.../trigger_files/beam_PostCommit_Go_VR_Flink.json | 2 +-
.../beam_PostCommit_PortableJar_Flink.json | 1 +
.../beam_PostCommit_Python_Examples_Flink.json | 3 +
.../beam_PostCommit_Python_Portable_Flink.yml | 5 +-
...eam_PostCommit_Python_ValidatesRunner_Flink.yml | 2 +-
.../workflows/beam_PreCommit_Python_PVR_Flink.yml | 1 +
CHANGES.md | 3 +-
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 3 +-
.../FlinkStreamingPortablePipelineTranslator.java | 21 +++++--
.../FlinkStreamingPortablePipelineTranslator.java | 21 +++++--
sdks/go/test/build.gradle | 17 +++++-
sdks/go/test/run_validatesrunner_tests.sh | 6 +-
.../extensions/sql/expansion-service/build.gradle | 1 -
.../runners/portability/flink_runner_test.py | 29 ++++++++--
sdks/python/test-suites/portable/common.gradle | 64 ++++++++++------------
15 files changed, 118 insertions(+), 61 deletions(-)
diff --git a/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json
b/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json
index d5ac7fc60d7..83b506b55ed 100644
--- a/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json
+++ b/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json
@@ -1,6 +1,6 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run",
- "modification": 1,
+ "modification": 2,
"https://github.com/apache/beam/pull/32440": "testing datastream
optimizations",
"https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19
support"
}
diff --git a/.github/trigger_files/beam_PostCommit_PortableJar_Flink.json
b/.github/trigger_files/beam_PostCommit_PortableJar_Flink.json
new file mode 100644
index 00000000000..0967ef424bc
--- /dev/null
+++ b/.github/trigger_files/beam_PostCommit_PortableJar_Flink.json
@@ -0,0 +1 @@
+{}
diff --git a/.github/trigger_files/beam_PostCommit_Python_Examples_Flink.json
b/.github/trigger_files/beam_PostCommit_Python_Examples_Flink.json
new file mode 100644
index 00000000000..9f5479a1277
--- /dev/null
+++ b/.github/trigger_files/beam_PostCommit_Python_Examples_Flink.json
@@ -0,0 +1,3 @@
+{
+ "modification": "#37313"
+}
diff --git a/.github/workflows/beam_PostCommit_Python_Portable_Flink.yml
b/.github/workflows/beam_PostCommit_Python_Portable_Flink.yml
index f3c032ebffe..aaf1dd51e26 100644
--- a/.github/workflows/beam_PostCommit_Python_Portable_Flink.yml
+++ b/.github/workflows/beam_PostCommit_Python_Portable_Flink.yml
@@ -63,8 +63,9 @@ jobs:
job_name: ["beam_PostCommit_Python_Portable_Flink"]
job_phrase: ["Run Python Portable Flink"]
# TODO: Enable PROCESS https://github.com/apache/beam/issues/35702
- # environment_type: ['DOCKER', 'LOOPBACK', 'PROCESS']
- environment_type: ['DOCKER', 'LOOPBACK']
+ # all environment_type: ['DOCKER', 'LOOPBACK', 'PROCESS']
+ # Run modes not covered by PreCommit_Python_PVR_Flink (i.e. other than
'LOOPBACK')
+ environment_type: ['DOCKER']
steps:
- uses: actions/checkout@v4
- name: Setup repository
diff --git a/.github/workflows/beam_PostCommit_Python_ValidatesRunner_Flink.yml
b/.github/workflows/beam_PostCommit_Python_ValidatesRunner_Flink.yml
index 51006c079b7..15f4cbc0a8c 100644
--- a/.github/workflows/beam_PostCommit_Python_ValidatesRunner_Flink.yml
+++ b/.github/workflows/beam_PostCommit_Python_ValidatesRunner_Flink.yml
@@ -88,7 +88,7 @@ jobs:
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}}
uses: ./.github/actions/gradle-command-self-hosted-action
with:
- gradle-command:
:sdks:python:test-suites:portable:py${{steps.set_py_ver_clean.outputs.py_ver_clean}}:flinkValidatesRunner
+ gradle-command:
:sdks:python:test-suites:portable:py${{steps.set_py_ver_clean.outputs.py_ver_clean}}:flink1ValidatesRunner
arguments: |
-PpythonVersion=${{ matrix.python_version }} \
- name: Archive Python Test Results
diff --git a/.github/workflows/beam_PreCommit_Python_PVR_Flink.yml
b/.github/workflows/beam_PreCommit_Python_PVR_Flink.yml
index 588605aa2c2..05603cb6a21 100644
--- a/.github/workflows/beam_PreCommit_Python_PVR_Flink.yml
+++ b/.github/workflows/beam_PreCommit_Python_PVR_Flink.yml
@@ -106,6 +106,7 @@ jobs:
env:
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}}
with:
+ # Run Flink 2 tests. Flink 1.20 is covered by
PostCommit_Python_ValidatesRunner_Flink
gradle-command:
:sdks:python:test-suites:portable:py313:flinkValidatesRunner
arguments: |
-PpythonVersion=3.13 \
diff --git a/CHANGES.md b/CHANGES.md
index 3bd266af793..bd24be1989d 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -61,8 +61,7 @@
* New highly anticipated feature X added to Python SDK
([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK
([#Y](https://github.com/apache/beam/issues/Y)).
-* Flink 2.0 support for Java Classic and Portable Flink Runners
([#36947](https://github.com/apache/beam/issues/36947)),
- experimental support for other SDK languages including Python.
+* Flink 2.0 support ([#36947](https://github.com/apache/beam/issues/36947)).
## I/Os
diff --git
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 94eebb7060a..aad671d22ce 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -552,8 +552,7 @@ class BeamModulePlugin implements Plugin<Project> {
project.ext.currentJavaVersion = getSupportedJavaVersion()
project.ext.allFlinkVersions = project.flink_versions.split(',')
- // TODO(https://github.com/apache/beam/issues/36947): Move to use
project.ext.allFlinkVersions.last() when Flink 2 support completed
- project.ext.latestFlinkVersion = '1.20'
+ project.ext.latestFlinkVersion = project.ext.allFlinkVersions.last()
project.ext.nativeArchitecture = {
// Best guess as to this system's normalized native architecture name.
diff --git
a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index d68acdb6863..e8929b84593 100644
---
a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++
b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -100,6 +100,7 @@ import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.sdk.values.WindowedValues.WindowedValueCoder;
import org.apache.beam.sdk.values.WindowingStrategy;
import
org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashMultiset;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -987,8 +988,11 @@ public class FlinkStreamingPortablePipelineTranslator
// stage
String sideInputTag = sideInputId.getLocalName();
String collectionId =
- components
- .getTransformsOrThrow(sideInputId.getTransformId())
+ MoreObjects.firstNonNull(
+
components.getTransformsOrDefault(sideInputId.getTransformId(), null),
+ // In the case of optimized pipeline, side input transform
may not be found in
+ // component proto
+
stagePayload.getComponents().getTransformsOrThrow(sideInputId.getTransformId()))
.getInputsOrThrow(sideInputId.getLocalName());
RunnerApi.WindowingStrategy windowingStrategyProto =
components.getWindowingStrategiesOrThrow(
@@ -1045,8 +1049,11 @@ public class FlinkStreamingPortablePipelineTranslator
tagToIntMapping.put(tag, count);
count++;
String collectionId =
- components
- .getTransformsOrThrow(sideInput.getKey().getTransformId())
+ MoreObjects.firstNonNull(
+
components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null),
+ stagePayload
+ .getComponents()
+
.getTransformsOrThrow(sideInput.getKey().getTransformId()))
.getInputsOrThrow(sideInput.getKey().getLocalName());
DataStream<Object> sideInputStream =
context.getDataStreamOrThrow(collectionId);
TypeInformation<Object> tpe = sideInputStream.getType();
@@ -1078,7 +1085,11 @@ public class FlinkStreamingPortablePipelineTranslator
TupleTag<?> tag = sideInput.getValue().getTagInternal();
final int intTag = tagToIntMapping.get(tag);
RunnerApi.PTransform pTransform =
- components.getTransformsOrThrow(sideInput.getKey().getTransformId());
+ MoreObjects.firstNonNull(
+
components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null),
+ stagePayload
+ .getComponents()
+ .getTransformsOrThrow(sideInput.getKey().getTransformId()));
String collectionId =
pTransform.getInputsOrThrow(sideInput.getKey().getLocalName());
DataStream<WindowedValue<?>> sideInputStream =
context.getDataStreamOrThrow(collectionId);
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index cad90de8cee..caa5a1788c8 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -100,6 +100,7 @@ import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.sdk.values.WindowedValues.WindowedValueCoder;
import org.apache.beam.sdk.values.WindowingStrategy;
import
org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashMultiset;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -986,8 +987,11 @@ public class FlinkStreamingPortablePipelineTranslator
// stage
String sideInputTag = sideInputId.getLocalName();
String collectionId =
- components
- .getTransformsOrThrow(sideInputId.getTransformId())
+ MoreObjects.firstNonNull(
+
components.getTransformsOrDefault(sideInputId.getTransformId(), null),
+ // In the case of optimized pipeline, side input transform
may not be found in
+ // component proto
+
stagePayload.getComponents().getTransformsOrThrow(sideInputId.getTransformId()))
.getInputsOrThrow(sideInputId.getLocalName());
RunnerApi.WindowingStrategy windowingStrategyProto =
components.getWindowingStrategiesOrThrow(
@@ -1044,8 +1048,11 @@ public class FlinkStreamingPortablePipelineTranslator
tagToIntMapping.put(tag, count);
count++;
String collectionId =
- components
- .getTransformsOrThrow(sideInput.getKey().getTransformId())
+ MoreObjects.firstNonNull(
+
components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null),
+ stagePayload
+ .getComponents()
+
.getTransformsOrThrow(sideInput.getKey().getTransformId()))
.getInputsOrThrow(sideInput.getKey().getLocalName());
DataStream<Object> sideInputStream =
context.getDataStreamOrThrow(collectionId);
TypeInformation<Object> tpe = sideInputStream.getType();
@@ -1077,7 +1084,11 @@ public class FlinkStreamingPortablePipelineTranslator
TupleTag<?> tag = sideInput.getValue().getTagInternal();
final int intTag = tagToIntMapping.get(tag);
RunnerApi.PTransform pTransform =
- components.getTransformsOrThrow(sideInput.getKey().getTransformId());
+ MoreObjects.firstNonNull(
+
components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null),
+ stagePayload
+ .getComponents()
+ .getTransformsOrThrow(sideInput.getKey().getTransformId()));
String collectionId =
pTransform.getInputsOrThrow(sideInput.getKey().getLocalName());
DataStream<WindowedValue<?>> sideInputStream =
context.getDataStreamOrThrow(collectionId);
diff --git a/sdks/go/test/build.gradle b/sdks/go/test/build.gradle
index 5576c40c0aa..424b009fd12 100644
--- a/sdks/go/test/build.gradle
+++ b/sdks/go/test/build.gradle
@@ -79,18 +79,31 @@ task dataflowValidatesRunnerARM64() {
task flinkValidatesRunner {
group = "Verification"
+ // TODO(https://github.com/apache/beam/issues/37600) use
project.ext.latestFlinkVersion after resolved
+ def flinkVersion = '1.20'
+
dependsOn ":sdks:go:test:goBuild"
dependsOn ":sdks:go:container:docker"
dependsOn ":sdks:java:container:${project.ext.currentJavaVersion}:docker"
- dependsOn
":runners:flink:${project.ext.latestFlinkVersion}:job-server:shadowJar"
+ dependsOn ":runners:flink:${flinkVersion}:job-server:shadowJar"
dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar"
+ doFirst {
+ // Copy Flink conf file
+ copy {
+ from
"${project.rootDir}/runners/flink/2.0/src/test/resources/flink-test-config.yaml"
+ into "${project.buildDir}/flink-conf"
+
+ // Rename the file during the copy process
+ rename 'flink-test-config.yaml', 'config.yaml'
+ }
+ }
doLast {
def pipelineOptions = [ // Pipeline options piped directly to Go SDK
flags.
"--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
]
def options = [
"--runner flink",
- "--flink_job_server_jar
${project(":runners:flink:${project.ext.latestFlinkVersion}:job-server").shadowJar.archivePath}",
+ "--flink_job_server_jar
${project(":runners:flink:${flinkVersion}:job-server").shadowJar.archivePath}",
"--pipeline_opts \"${pipelineOptions.join(' ')}\"",
]
exec {
diff --git a/sdks/go/test/run_validatesrunner_tests.sh
b/sdks/go/test/run_validatesrunner_tests.sh
index be7a795f01a..972caef6a33 100755
--- a/sdks/go/test/run_validatesrunner_tests.sh
+++ b/sdks/go/test/run_validatesrunner_tests.sh
@@ -273,9 +273,13 @@ if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" ||
"$RUNNER" == "samza" || "$
echo "No endpoint specified; starting a new $RUNNER job server on
$ENDPOINT"
if [[ "$RUNNER" == "flink" ]]; then
"$JAVA_CMD" \
+ -Dslf4j.provider=org.slf4j.simple.SimpleServiceProvider \
+ -Dorg.slf4j.simpleLogger.log.org.apache.flink.metrics=error \
+ -Dorg.slf4j.simpleLogger.log.org.apache.flink.runtime=error \
+ -Dorg.slf4j.simpleLogger.log.org.apache.flink.streaming=error \
-jar $FLINK_JOB_SERVER_JAR \
--flink-master [local] \
- --flink-conf-dir
$CURRENT_DIRECTORY/../../../runners/flink/src/test/resources \
+ --flink-conf-dir $CURRENT_DIRECTORY/build/flink-conf/ \
--job-port $JOB_PORT \
--expansion-port 0 \
--artifact-port 0 &
diff --git a/sdks/java/extensions/sql/expansion-service/build.gradle
b/sdks/java/extensions/sql/expansion-service/build.gradle
index 8b5bd8c6924..562c1ac8dc7 100644
--- a/sdks/java/extensions/sql/expansion-service/build.gradle
+++ b/sdks/java/extensions/sql/expansion-service/build.gradle
@@ -56,5 +56,4 @@ shadowJar {
manifest {
attributes(["Multi-Release": true])
}
- outputs.upToDateWhen { false }
}
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index dbeef557ab5..096645833ae 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -18,6 +18,7 @@
import argparse
import logging
+import platform
import shlex
import typing
import unittest
@@ -139,6 +140,7 @@ class
FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt')
# path to write Flink configuration to
+ # Flink 1.x conf:
conf_path = path.join(cls.conf_dir, 'flink-conf.yaml')
file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter'
with open(conf_path, 'w') as f:
@@ -149,6 +151,19 @@ class
FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
'metrics.reporter.file.path: %s' % cls.test_metrics_path,
'metrics.scope.operator: <operator_name>',
]))
+ # Flink 2.x conf:
+ conf_path_2 = path.join(cls.conf_dir, 'config.yaml')
+ with open(conf_path_2, 'w') as f:
+ f.write(
+ '''metrics:
+ reporters: file
+ reporter:
+ file:
+ class: %s
+ path: %s
+ scope:
+ operator: <operator_name>
+''' % (file_reporter, cls.test_metrics_path))
@classmethod
def _subprocess_command(cls, job_port, expansion_port):
@@ -158,15 +173,19 @@ class
FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
cls._create_conf_dir()
cls.expansion_port = expansion_port
-
+ platform_specific_opts = []
+ if platform.system() == 'Linux':
+ # UseContainerSupport is supported in Linux and turned on by default
+ platform_specific_opts.append('-XX:-UseContainerSupport')
try:
- return [
- 'java',
- '-XX:-UseContainerSupport',
+ return ['java'] + platform_specific_opts + [
'--add-opens=java.base/java.lang=ALL-UNNAMED',
'--add-opens=java.base/java.nio=ALL-UNNAMED',
'--add-opens=java.base/java.util=ALL-UNNAMED',
- '-Dorg.slf4j.simpleLogger.defaultLogLevel=warn',
+ '-Dslf4j.provider=org.slf4j.simple.SimpleServiceProvider',
+ '-Dorg.slf4j.simpleLogger.log.org.apache.flink.metrics=error',
+ '-Dorg.slf4j.simpleLogger.log.org.apache.flink.runtime=error',
+ '-Dorg.slf4j.simpleLogger.log.org.apache.flink.streaming=error',
'-jar',
cls.flink_job_server_jar,
'--flink-master',
diff --git a/sdks/python/test-suites/portable/common.gradle
b/sdks/python/test-suites/portable/common.gradle
index f7fa3e8e0b2..843d32705c2 100644
--- a/sdks/python/test-suites/portable/common.gradle
+++ b/sdks/python/test-suites/portable/common.gradle
@@ -23,24 +23,32 @@ import org.apache.tools.ant.taskdefs.condition.Os
def pythonRootDir = "${rootDir}/sdks/python"
def pythonVersionSuffix = project.ext.pythonVersion.replace('.', '')
def latestFlinkVersion = project.ext.latestFlinkVersion
+// TODO(https://github.com/apache/beam/issues/36947): Remove when dropping
Flink 1.x support
+def latestFlink1Version = '1.20'
def currentJavaVersion = project.ext.currentJavaVersion
ext {
pythonContainerTask =
":sdks:python:container:py${pythonVersionSuffix}:docker"
}
-def createFlinkRunnerTestTask(String workerType) {
- def taskName = "flinkCompatibilityMatrix${workerType}"
- //
project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath
is not resolvable until runtime, so hard-code it here.
- def jobServerJar =
"${rootDir}/runners/flink/${latestFlinkVersion}/job-server/build/libs/beam-runners-flink-${latestFlinkVersion}-job-server-${version}.jar"
+def createFlinkRunnerTestTask(String workerType, String flinkVersion) {
+ String taskName
+
+ //
project(":runners:flink:${flinkVersion}:job-server").shadowJar.archivePath is
not resolvable until runtime, so hard-code it here.
+ def jobServerJar =
"${rootDir}/runners/flink/${flinkVersion}/job-server/build/libs/beam-runners-flink-${flinkVersion}-job-server-${version}.jar"
def options = "--flink_job_server_jar=${jobServerJar}
--environment_type=${workerType}"
+ if (flinkVersion.startsWith('1')) {
+ taskName = "flink1CompatibilityMatrix${workerType}"
+ } else {
+ taskName = "flinkCompatibilityMatrix${workerType}"
+ }
if (workerType == 'PROCESS') {
options += "
--environment_options=process_command=${buildDir.absolutePath}/sdk_worker.sh"
}
def task = toxTask(taskName, 'flink-runner-test', options)
// Through the Flink job server, we transitively add dependencies on the
expansion services needed in tests.
task.configure {
- dependsOn ":runners:flink:${latestFlinkVersion}:job-server:shadowJar"
+ dependsOn ":runners:flink:${flinkVersion}:job-server:shadowJar"
// The Java SDK worker is required to execute external transforms.
def suffix = getSupportedJavaVersion()
dependsOn ":sdks:java:container:${suffix}:docker"
@@ -53,31 +61,19 @@ def createFlinkRunnerTestTask(String workerType) {
return task
}
-createFlinkRunnerTestTask('DOCKER')
-createFlinkRunnerTestTask('PROCESS')
-createFlinkRunnerTestTask('LOOPBACK')
+createFlinkRunnerTestTask('DOCKER', latestFlinkVersion)
+createFlinkRunnerTestTask('PROCESS', latestFlinkVersion)
+createFlinkRunnerTestTask('LOOPBACK', latestFlinkVersion)
+createFlinkRunnerTestTask('DOCKER', latestFlink1Version)
+createFlinkRunnerTestTask('PROCESS', latestFlink1Version)
+createFlinkRunnerTestTask('LOOPBACK', latestFlink1Version)
-task flinkValidatesRunner() {
- dependsOn 'flinkCompatibilityMatrixLOOPBACK'
+task flink1ValidatesRunner() {
+ dependsOn 'flink1CompatibilityMatrixLOOPBACK'
}
-// TODO(https://github.com/apache/beam/issues/19962): Enable on pre-commit.
-tasks.register("flinkTriggerTranscript") {
- dependsOn 'setupVirtualenv'
- dependsOn ":runners:flink:${latestFlinkVersion}:job-server:shadowJar"
- doLast {
- exec {
- executable 'sh'
- args '-c', """
- . ${envdir}/bin/activate \\
- && cd ${pythonRootDir} \\
- && pip install -e .[test] \\
- && pytest \\
-
apache_beam/transforms/trigger_test.py::WeakTestStreamTranscriptTest \\
- --test-pipeline-options='--runner=FlinkRunner
--environment_type=LOOPBACK
--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server:").shadowJar.archivePath}'
- """
- }
- }
+task flinkValidatesRunner() {
+ dependsOn 'flinkCompatibilityMatrixLOOPBACK'
}
// Verifies BEAM-10702.
@@ -276,7 +272,7 @@ project.tasks.register("flinkExamples") {
dependsOn = [
'setupVirtualenv',
'installGcpTest',
- ":runners:flink:${latestFlinkVersion}:job-server:shadowJar"
+ ":runners:flink:${latestFlink1Version}:job-server:shadowJar"
]
doLast {
def testOpts = [
@@ -288,7 +284,7 @@ project.tasks.register("flinkExamples") {
"--project=apache-beam-testing",
"--environment_type=LOOPBACK",
"--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it",
-
"--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath}",
+
"--flink_job_server_jar=${project(":runners:flink:${latestFlink1Version}:job-server").shadowJar.archivePath}",
"--flink_conf_dir=${flink_conf_dir}",
'--sdk_harness_log_level_overrides=' +
// suppress info level flink.runtime log flood
@@ -388,7 +384,7 @@
project.tasks.register("postCommitPy${pythonVersionSuffix}IT") {
dependsOn = [
'setupVirtualenv',
'installGcpTest',
- ":runners:flink:${latestFlinkVersion}:job-server:shadowJar",
+ ":runners:flink:${latestFlink1Version}:job-server:shadowJar",
":sdks:java:container:${fork_java_version}:docker",
':sdks:java:testing:kafka-service:buildTestKafkaServiceJar',
':sdks:java:io:expansion-service:shadowJar',
@@ -412,7 +408,7 @@
project.tasks.register("postCommitPy${pythonVersionSuffix}IT") {
"--project=apache-beam-testing",
"--environment_type=LOOPBACK",
"--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it",
-
"--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath}",
+
"--flink_job_server_jar=${project(":runners:flink:${latestFlink1Version}:job-server").shadowJar.archivePath}",
"--flink_conf_dir=${flink_conf_dir}",
'--sdk_harness_log_level_overrides=' +
// suppress info level flink.runtime log flood
@@ -444,7 +440,7 @@ project.tasks.register("xlangSpannerIOIT") {
dependsOn = [
'setupVirtualenv',
'installGcpTest',
- ":runners:flink:${latestFlinkVersion}:job-server:shadowJar",
+ ":runners:flink:${latestFlink1Version}:job-server:shadowJar",
":sdks:java:container:${currentJavaVersion}:docker",
':sdks:java:io:expansion-service:shadowJar',
':sdks:java:io:google-cloud-platform:expansion-service:shadowJar',
@@ -463,7 +459,7 @@ project.tasks.register("xlangSpannerIOIT") {
"--project=apache-beam-testing",
"--environment_type=LOOPBACK",
"--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it",
-
"--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath}",
+
"--flink_job_server_jar=${project(":runners:flink:${latestFlink1Version}:job-server").shadowJar.archivePath}",
'--sdk_harness_log_level_overrides=' +
// suppress info level flink.runtime log flood
'{\\"org.apache.flink.runtime\\":\\"WARN\\",' +
@@ -508,7 +504,7 @@ def addTestJavaJarCreator(String runner, Task
jobServerJarTask) {
}
// TODO(BEAM-11333) Update and test multiple Flink versions.
-addTestJavaJarCreator("FlinkRunner",
tasks.getByPath(":runners:flink:${latestFlinkVersion}:job-server:shadowJar"))
+addTestJavaJarCreator("FlinkRunner",
tasks.getByPath(":runners:flink:${latestFlink1Version}:job-server:shadowJar"))
addTestJavaJarCreator("SparkRunner",
tasks.getByPath(":runners:spark:3:job-server:shadowJar"))
def addTestFlinkUberJar(boolean saveMainSession) {