This is an automated email from the ASF dual-hosted git repository.
Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0bd11fd4631 Go VR Flink test on Flink 2.0 (#37640)
0bd11fd4631 is described below
commit 0bd11fd4631df14118d69e705252296e78835c5e
Author: Yi Hu <[email protected]>
AuthorDate: Wed May 6 16:59:35 2026 -0400
Go VR Flink test on Flink 2.0 (#37640)
---
.github/trigger_files/beam_PostCommit_Go_VR_Flink.json | 1 +
.../apache/beam/runners/flink/FlinkExecutionEnvironments.java | 10 ++++++++++
runners/flink/flink_runner.gradle | 7 +++++++
.../apache/beam/runners/flink/FlinkExecutionEnvironments.java | 8 ++++++++
sdks/go/test/build.gradle | 3 +--
sdks/go/test/integration/integration.go | 2 +-
6 files changed, 28 insertions(+), 3 deletions(-)
diff --git a/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json
b/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json
index ed3d846bc7b..939c43396fd 100644
--- a/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json
+++ b/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json
@@ -2,4 +2,5 @@
"comment": "Modify this file in a trivial way to cause this test suite to
run",
"modification": 2,
"https://github.com/apache/beam/pull/32440": "testing datastream
optimizations",
+ "pr": "37640"
}
diff --git
a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index 0d48526e1d0..4a1c1cb0c5e 100644
---
a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++
b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@ -36,6 +36,7 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Stream
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
@@ -270,6 +271,7 @@ public class FlinkExecutionEnvironments {
flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval());
}
configureWebUIOptions(flinkStreamEnv.getConfig(),
options.as(PipelineOptions.class));
+ configureCustomKryoSerializers(flinkStreamEnv.getConfig());
return flinkStreamEnv;
}
@@ -294,6 +296,14 @@ public class FlinkExecutionEnvironments {
}
}
+ private static void configureCustomKryoSerializers(ExecutionConfig config) {
+ SerializerConfigImpl serializerConfig = (SerializerConfigImpl)
config.getSerializerConfig();
+ // Force Beam schema to use JavaSerializer to fix serialization involving
ImmutableMap
+ serializerConfig.registerTypeWithKryoSerializer(
+ org.apache.beam.sdk.schemas.Schema.class,
+ com.esotericsoftware.kryo.serializers.JavaSerializer.class);
+ }
+
private static class GlobalJobParametersImpl extends
ExecutionConfig.GlobalJobParameters {
private final Map<String, String> jobOptions;
diff --git a/runners/flink/flink_runner.gradle
b/runners/flink/flink_runner.gradle
index 98e7d547b4d..837561ec71b 100644
--- a/runners/flink/flink_runner.gradle
+++ b/runners/flink/flink_runner.gradle
@@ -230,6 +230,13 @@ dependencies {
// configuration (https://issues.apache.org/jira/browse/BEAM-11732).
permitUnusedDeclared "org.apache.flink:flink-clients:$flink_version"
+ // align with Flink's kryo version (groupId changed)
+ if (flink_major.startsWith('2')) {
+ implementation "com.esotericsoftware:kryo:5.6.2"
+ } else {
+ implementation "com.esotericsoftware.kryo:kryo:2.24.0"
+ }
+
implementation "org.apache.flink:flink-streaming-java:$flink_version"
testImplementation
"org.apache.flink:flink-statebackend-rocksdb:$flink_version"
testImplementation
"org.apache.flink:flink-streaming-java:$flink_version:tests"
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index a0e5908cc99..de92dd94605 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@ -298,6 +298,7 @@ public class FlinkExecutionEnvironments {
configureStateBackend(options, flinkStreamEnv);
configureWebUIOptions(flinkStreamEnv.getConfig(),
options.as(PipelineOptions.class));
+ configureCustomKryoSerializers(flinkStreamEnv.getConfig());
return flinkStreamEnv;
}
@@ -322,6 +323,13 @@ public class FlinkExecutionEnvironments {
}
}
+ private static void configureCustomKryoSerializers(ExecutionConfig config) {
+ // Force Beam schema to use JavaSerializer to fix serialization involving
ImmutableMap
+ config.registerTypeWithKryoSerializer(
+ org.apache.beam.sdk.schemas.Schema.class,
+ com.esotericsoftware.kryo.serializers.JavaSerializer.class);
+ }
+
private static class GlobalJobParametersImpl extends
ExecutionConfig.GlobalJobParameters {
private final Map<String, String> jobOptions;
diff --git a/sdks/go/test/build.gradle b/sdks/go/test/build.gradle
index ddaaa136b43..67713471606 100644
--- a/sdks/go/test/build.gradle
+++ b/sdks/go/test/build.gradle
@@ -79,8 +79,7 @@ task dataflowValidatesRunnerARM64() {
task flinkValidatesRunner {
group = "Verification"
- // TODO(https://github.com/apache/beam/issues/37600) use
project.ext.latestFlinkVersion after resolved
- def flinkVersion = '1.20'
+ def flinkVersion = project.ext.latestFlinkVersion
dependsOn ":sdks:go:test:goBuild"
dependsOn ":sdks:go:container:docker"
diff --git a/sdks/go/test/integration/integration.go
b/sdks/go/test/integration/integration.go
index b23547bf4fa..a0eef7d1018 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -199,7 +199,7 @@ var flinkFilters = []string{
"TestTestStreamToGBK",
"TestTestStreamTimersEventTime",
- "TestTimers_EventTime_Unbounded", // (failure when comparing on side
inputs (NPE on window lookup))
+ "TestTimers_EventTime_WithNoOutputTimestamp", // Encounter error:
TimestampCombiner moved element from TIMESTAMP_MAX_VALUE to earlier time (end
of global window) for window GlobalWindow
"TestTimers_ProcessingTime.*", // Flink doesn't support processing
time timers.
// no support for BundleFinalizer