This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d33741c035 Fix Flink IllegalThreadStateException on Java 8 (#36730)
1d33741c035 is described below

commit 1d33741c0357a6d2eca341bd269c4a28ba8cd918
Author: Abdelrahman Ibrahim <[email protected]>
AuthorDate: Thu Nov 6 04:32:31 2025 +0200

    Fix Flink IllegalThreadStateException on Java 8 (#36730)
---
 .../flink/FlinkPipelineExecutionEnvironment.java   | 43 ++++++++++++++++++++++
 1 file changed, 43 insertions(+)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index 029eff25a82..973aa6c2429 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -20,6 +20,8 @@ package org.apache.beam.runners.flink;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.beam.runners.core.metrics.MetricsPusher;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
@@ -29,8 +31,10 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.Vi
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.slf4j.Logger;
@@ -52,6 +56,8 @@ class FlinkPipelineExecutionEnvironment {
   private static final Logger LOG =
       LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
 
+  private static final Set<ThreadGroup> protectedThreadGroups = 
ConcurrentHashMap.newKeySet();
+
   private final FlinkPipelineOptions options;
 
   /**
@@ -143,6 +149,7 @@ class FlinkPipelineExecutionEnvironment {
     if (flinkBatchEnv != null) {
       if (options.getAttachedMode()) {
         JobExecutionResult jobExecutionResult = flinkBatchEnv.execute(jobName);
+        ensureFlinkCleanupComplete(flinkBatchEnv);
         return createAttachedPipelineResult(jobExecutionResult);
       } else {
         JobClient jobClient = flinkBatchEnv.executeAsync(jobName);
@@ -151,6 +158,7 @@ class FlinkPipelineExecutionEnvironment {
     } else if (flinkStreamEnv != null) {
       if (options.getAttachedMode()) {
         JobExecutionResult jobExecutionResult = 
flinkStreamEnv.execute(jobName);
+        ensureFlinkCleanupComplete(flinkStreamEnv);
         return createAttachedPipelineResult(jobExecutionResult);
       } else {
         JobClient jobClient = flinkStreamEnv.executeAsync(jobName);
@@ -161,6 +169,41 @@ class FlinkPipelineExecutionEnvironment {
     }
   }
 
+  /** Prevents ThreadGroup destruction while Flink cleanup threads are still 
running. */
+  private void ensureFlinkCleanupComplete(Object executionEnv) {
+    String javaVersion = System.getProperty("java.version");
+    if (javaVersion == null || !javaVersion.startsWith("1.8")) {
+      return;
+    }
+
+    if (!(executionEnv instanceof LocalStreamEnvironment
+        || executionEnv instanceof LocalEnvironment)) {
+      return;
+    }
+
+    ThreadGroup currentThreadGroup = Thread.currentThread().getThreadGroup();
+    if (currentThreadGroup == null) {
+      return;
+    }
+
+    protectedThreadGroups.add(currentThreadGroup);
+
+    Thread cleanupReleaser =
+        new Thread(
+            () -> {
+              try {
+                Thread.sleep(2000); // 2 seconds should be enough for Flink 
cleanup
+              } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+              } finally {
+                protectedThreadGroups.remove(currentThreadGroup);
+              }
+            },
+            "FlinkCleanupReleaser");
+    cleanupReleaser.setDaemon(true);
+    cleanupReleaser.start();
+  }
+
   private FlinkDetachedRunnerResult createDetachedPipelineResult(
       JobClient jobClient, FlinkPipelineOptions options) {
     LOG.info("Pipeline submitted in detached mode");

Reply via email to