This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1d33741c035 Fix Flink IllegalThreadStateException on Java 8 (#36730)
1d33741c035 is described below
commit 1d33741c0357a6d2eca341bd269c4a28ba8cd918
Author: Abdelrahman Ibrahim <[email protected]>
AuthorDate: Thu Nov 6 04:32:31 2025 +0200
Fix Flink IllegalThreadStateException on Java 8 (#36730)
---
.../flink/FlinkPipelineExecutionEnvironment.java | 43 ++++++++++++++++++++++
1 file changed, 43 insertions(+)
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index 029eff25a82..973aa6c2429 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -20,6 +20,8 @@ package org.apache.beam.runners.flink;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
@@ -29,8 +31,10 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.Vi
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
@@ -52,6 +56,8 @@ class FlinkPipelineExecutionEnvironment {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
+ private static final Set<ThreadGroup> protectedThreadGroups =
ConcurrentHashMap.newKeySet();
+
private final FlinkPipelineOptions options;
/**
@@ -143,6 +149,7 @@ class FlinkPipelineExecutionEnvironment {
if (flinkBatchEnv != null) {
if (options.getAttachedMode()) {
JobExecutionResult jobExecutionResult = flinkBatchEnv.execute(jobName);
+ ensureFlinkCleanupComplete(flinkBatchEnv);
return createAttachedPipelineResult(jobExecutionResult);
} else {
JobClient jobClient = flinkBatchEnv.executeAsync(jobName);
@@ -151,6 +158,7 @@ class FlinkPipelineExecutionEnvironment {
} else if (flinkStreamEnv != null) {
if (options.getAttachedMode()) {
JobExecutionResult jobExecutionResult =
flinkStreamEnv.execute(jobName);
+ ensureFlinkCleanupComplete(flinkStreamEnv);
return createAttachedPipelineResult(jobExecutionResult);
} else {
JobClient jobClient = flinkStreamEnv.executeAsync(jobName);
@@ -161,6 +169,41 @@ class FlinkPipelineExecutionEnvironment {
}
}
+ /** Prevents ThreadGroup destruction while Flink cleanup threads are still
running. */
+ private void ensureFlinkCleanupComplete(Object executionEnv) {
+ String javaVersion = System.getProperty("java.version");
+ if (javaVersion == null || !javaVersion.startsWith("1.8")) {
+ return;
+ }
+
+ if (!(executionEnv instanceof LocalStreamEnvironment
+ || executionEnv instanceof LocalEnvironment)) {
+ return;
+ }
+
+ ThreadGroup currentThreadGroup = Thread.currentThread().getThreadGroup();
+ if (currentThreadGroup == null) {
+ return;
+ }
+
+ protectedThreadGroups.add(currentThreadGroup);
+
+ Thread cleanupReleaser =
+ new Thread(
+ () -> {
+ try {
+ Thread.sleep(2000); // 2 seconds should be enough for Flink
cleanup
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ protectedThreadGroups.remove(currentThreadGroup);
+ }
+ },
+ "FlinkCleanupReleaser");
+ cleanupReleaser.setDaemon(true);
+ cleanupReleaser.start();
+ }
+
private FlinkDetachedRunnerResult createDetachedPipelineResult(
JobClient jobClient, FlinkPipelineOptions options) {
LOG.info("Pipeline submitted in detached mode");