This is an automated email from the ASF dual-hosted git repository.

Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bd11fd4631 Go VR Flink test on Flink 2.0 (#37640)
0bd11fd4631 is described below

commit 0bd11fd4631df14118d69e705252296e78835c5e
Author: Yi Hu <[email protected]>
AuthorDate: Wed May 6 16:59:35 2026 -0400

    Go VR Flink test on Flink 2.0 (#37640)
---
 .github/trigger_files/beam_PostCommit_Go_VR_Flink.json         |  1 +
 .../apache/beam/runners/flink/FlinkExecutionEnvironments.java  | 10 ++++++++++
 runners/flink/flink_runner.gradle                              |  7 +++++++
 .../apache/beam/runners/flink/FlinkExecutionEnvironments.java  |  8 ++++++++
 sdks/go/test/build.gradle                                      |  3 +--
 sdks/go/test/integration/integration.go                        |  2 +-
 6 files changed, 28 insertions(+), 3 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json 
b/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json
index ed3d846bc7b..939c43396fd 100644
--- a/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json
+++ b/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json
@@ -2,4 +2,5 @@
   "comment": "Modify this file in a trivial way to cause this test suite to 
run",
   "modification": 2,
   "https://github.com/apache/beam/pull/32440": "testing datastream 
optimizations",
+  "pr": "37640"
 }
diff --git 
a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
 
b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index 0d48526e1d0..4a1c1cb0c5e 100644
--- 
a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++ 
b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@ -36,6 +36,7 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Stream
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
@@ -270,6 +271,7 @@ public class FlinkExecutionEnvironments {
       
flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval());
     }
     configureWebUIOptions(flinkStreamEnv.getConfig(), 
options.as(PipelineOptions.class));
+    configureCustomKryoSerializers(flinkStreamEnv.getConfig());
 
     return flinkStreamEnv;
   }
@@ -294,6 +296,14 @@ public class FlinkExecutionEnvironments {
     }
   }
 
+  private static void configureCustomKryoSerializers(ExecutionConfig config) {
+    SerializerConfigImpl serializerConfig = (SerializerConfigImpl) 
config.getSerializerConfig();
+    // Force Beam schema to use JavaSerializer to fix serialization involving 
ImmutableMap
+    serializerConfig.registerTypeWithKryoSerializer(
+        org.apache.beam.sdk.schemas.Schema.class,
+        com.esotericsoftware.kryo.serializers.JavaSerializer.class);
+  }
+
   private static class GlobalJobParametersImpl extends 
ExecutionConfig.GlobalJobParameters {
     private final Map<String, String> jobOptions;
 
diff --git a/runners/flink/flink_runner.gradle 
b/runners/flink/flink_runner.gradle
index 98e7d547b4d..837561ec71b 100644
--- a/runners/flink/flink_runner.gradle
+++ b/runners/flink/flink_runner.gradle
@@ -230,6 +230,13 @@ dependencies {
   // configuration (https://issues.apache.org/jira/browse/BEAM-11732).
   permitUnusedDeclared "org.apache.flink:flink-clients:$flink_version"
 
+  // align with Flink's kryo version (groupId changed)
+  if (flink_major.startsWith('2')) {
+    implementation "com.esotericsoftware:kryo:5.6.2"
+  } else {
+    implementation "com.esotericsoftware.kryo:kryo:2.24.0"
+  }
+
   implementation "org.apache.flink:flink-streaming-java:$flink_version"
   testImplementation 
"org.apache.flink:flink-statebackend-rocksdb:$flink_version"
   testImplementation 
"org.apache.flink:flink-streaming-java:$flink_version:tests"
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index a0e5908cc99..de92dd94605 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@ -298,6 +298,7 @@ public class FlinkExecutionEnvironments {
     configureStateBackend(options, flinkStreamEnv);
 
     configureWebUIOptions(flinkStreamEnv.getConfig(), 
options.as(PipelineOptions.class));
+    configureCustomKryoSerializers(flinkStreamEnv.getConfig());
 
     return flinkStreamEnv;
   }
@@ -322,6 +323,13 @@ public class FlinkExecutionEnvironments {
     }
   }
 
+  private static void configureCustomKryoSerializers(ExecutionConfig config) {
+    // Force Beam schema to use JavaSerializer to fix serialization involving 
ImmutableMap
+    config.registerTypeWithKryoSerializer(
+        org.apache.beam.sdk.schemas.Schema.class,
+        com.esotericsoftware.kryo.serializers.JavaSerializer.class);
+  }
+
   private static class GlobalJobParametersImpl extends 
ExecutionConfig.GlobalJobParameters {
     private final Map<String, String> jobOptions;
 
diff --git a/sdks/go/test/build.gradle b/sdks/go/test/build.gradle
index ddaaa136b43..67713471606 100644
--- a/sdks/go/test/build.gradle
+++ b/sdks/go/test/build.gradle
@@ -79,8 +79,7 @@ task dataflowValidatesRunnerARM64() {
 task flinkValidatesRunner {
   group = "Verification"
 
-  // TODO(https://github.com/apache/beam/issues/37600) use 
project.ext.latestFlinkVersion after resolved
-  def flinkVersion = '1.20'
+  def flinkVersion = project.ext.latestFlinkVersion
 
   dependsOn ":sdks:go:test:goBuild"
   dependsOn ":sdks:go:container:docker"
diff --git a/sdks/go/test/integration/integration.go 
b/sdks/go/test/integration/integration.go
index b23547bf4fa..a0eef7d1018 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -199,7 +199,7 @@ var flinkFilters = []string{
        "TestTestStreamToGBK",
        "TestTestStreamTimersEventTime",
 
-       "TestTimers_EventTime_Unbounded", // (failure when comparing on side 
inputs (NPE on window lookup))
+       "TestTimers_EventTime_WithNoOutputTimestamp", // Encounter error: 
TimestampCombiner moved element from TIMESTAMP_MAX_VALUE to earlier time (end 
of global window) for window GlobalWindow
        "TestTimers_ProcessingTime.*",    // Flink doesn't support processing 
time timers.
 
        // no support for BundleFinalizer

Reply via email to