This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 197c7b97adc9a781c66b5e062cda9662c425f0bb
Author: Yi Zhang <[email protected]>
AuthorDate: Fri Feb 13 10:30:44 2026 +0800

    [FLINK-38973][runtime] Make jobId nullable in StreamGraph to maintain 
compatibility for fixed jobId
---
 .../client/deployment/executors/PipelineExecutorUtils.java   | 11 +++++++++++
 .../program/DefaultPackagedProgramRetrieverITCase.java       |  4 ++--
 .../apache/flink/runtime/webmonitor/WebFrontendITCase.java   |  1 +
 .../apache/flink/runtime/jobgraph/ExecutionPlanUtils.java    |  6 +++---
 .../flink/streaming/api/graph/AdaptiveGraphManager.java      |  2 +-
 .../org/apache/flink/streaming/api/graph/StreamGraph.java    | 11 +++++++----
 .../flink/streaming/api/graph/StreamGraphGenerator.java      |  3 +--
 .../streaming/api/graph/StreamingJobGraphGenerator.java      |  2 +-
 .../org/apache/flink/test/scheduling/JMFailoverITCase.java   | 10 ++++++++--
 .../flink/test/streaming/api/datastream/WatermarkITCase.java | 12 ++++++++++--
 10 files changed, 45 insertions(+), 17 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
index 21dbef76c8c..509d7d307ed 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.deployment.executors;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.client.cli.ClientOptions;
@@ -25,6 +26,7 @@ import org.apache.flink.client.cli.ExecutionConfigAccessor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.PipelineOptionsInternal;
 import org.apache.flink.core.execution.JobStatusChangedListener;
 import org.apache.flink.streaming.api.graph.ExecutionPlan;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -87,6 +89,15 @@ public class PipelineExecutorUtils {
         final ExecutionConfigAccessor executionConfigAccessor =
                 ExecutionConfigAccessor.fromConfiguration(configuration);
 
+        if (streamGraph.getOptionalJobId().isEmpty()) {
+            JobID jobId =
+                    configuration
+                            
.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)
+                            .map(JobID::fromHexString)
+                            .orElse(JobID.generate());
+            streamGraph.setJobId(jobId);
+        }
+
         if (configuration.get(DeploymentOptions.ATTACHED)
                 && configuration.get(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
             streamGraph.setInitialClientHeartbeatTimeout(
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java
index 61d144c0060..d8fc7898a3b 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java
@@ -217,7 +217,7 @@ class DefaultPackagedProgramRetrieverITCase {
         assertThat(streamGraph.getSavepointRestoreSettings())
                 .isEqualTo(SavepointRestoreSettings.none());
         assertThat(streamGraph.getMaximumParallelism()).isEqualTo(parallelism);
-        // we don't check the job id because StreamPlanEnvironment does not 
support fixed job id
+        assertThat(streamGraph.getJobID()).isEqualTo(jobId);
     }
 
     @Test
@@ -281,7 +281,7 @@ class DefaultPackagedProgramRetrieverITCase {
         final StreamGraph streamGraph = 
retrieveStreamGraph(retrieverUnderTest, configuration);
 
         
assertThat(streamGraph.getSavepointRestoreSettings()).isEqualTo(savepointRestoreSettings);
-        // we don't check the job id because StreamPlanEnvironment does not 
support fixed job id
+        assertThat(streamGraph.getJobID()).isEqualTo(jobId);
     }
 
     @Test
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index 5fd80337473..bd51f730a01 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -554,6 +554,7 @@ class WebFrontendITCase {
                 .setParallelism(2)
                 .addSink(new SinkFunction<>() {});
         StreamGraph streamGraph = env.getStreamGraph();
+        streamGraph.setJobId(JobID.generate());
         final JobID jid = streamGraph.getJobID();
 
         clusterClient.submitJob(streamGraph).get();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ExecutionPlanUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ExecutionPlanUtils.java
index 71d5207c400..d9f04aab43e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ExecutionPlanUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ExecutionPlanUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobgraph;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -32,6 +31,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 
 /** Utilities for generating {@link 
org.apache.flink.streaming.api.graph.ExecutionPlan}. */
 public enum ExecutionPlanUtils {
@@ -40,13 +40,13 @@ public enum ExecutionPlanUtils {
     private static final Logger LOG = 
LoggerFactory.getLogger(ExecutionPlanUtils.class);
 
     public static Map<String, DistributedCache.DistributedCacheEntry> 
prepareUserArtifactEntries(
-            Map<String, DistributedCache.DistributedCacheEntry> userArtifacts, 
JobID jobId) {
+            Map<String, DistributedCache.DistributedCacheEntry> userArtifacts) 
{
         final Map<String, DistributedCache.DistributedCacheEntry> result = new 
HashMap<>();
 
         if (userArtifacts != null && !userArtifacts.isEmpty()) {
             try {
                 java.nio.file.Path tmpDir =
-                        Files.createTempDirectory("flink-distributed-cache-" + 
jobId);
+                        Files.createTempDirectory("flink-distributed-cache-" + 
UUID.randomUUID());
                 for (Map.Entry<String, DistributedCache.DistributedCacheEntry> 
originalEntry :
                         userArtifacts.entrySet()) {
                     Path filePath = new 
Path(originalEntry.getValue().filePath);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java
index 05d430ff8de..b6dc6a95e46 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java
@@ -191,7 +191,7 @@ public class AdaptiveGraphManager
         this.jobGraph =
                 createAndInitializeJobGraph(
                         streamGraph,
-                        streamGraph.getJobID(),
+                        streamGraph.getOptionalJobId().orElse(null),
                         streamGraph.getApplicationId().orElse(null));
 
         this.defaultSlotSharingGroup = new SlotSharingGroup();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index db80a3bca41..f03912bc857 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -134,7 +134,7 @@ public class StreamGraph implements Pipeline, ExecutionPlan 
{
 
     private String jobName;
 
-    private JobID jobId;
+    @Nullable private JobID jobId;
 
     /** ID of the application this job belongs to. */
     @Nullable private ApplicationID applicationId;
@@ -213,7 +213,6 @@ public class StreamGraph implements Pipeline, ExecutionPlan 
{
         this.executionConfig = checkNotNull(executionConfig);
         this.checkpointConfig = checkNotNull(checkpointConfig);
         this.savepointRestoreSettings = checkNotNull(savepointRestoreSettings);
-        this.jobId = new JobID();
         this.jobName = "(unnamed job)";
 
         // create an empty new stream graph.
@@ -1281,12 +1280,16 @@ public class StreamGraph implements Pipeline, 
ExecutionPlan {
     }
 
     public void setJobId(JobID jobId) {
-        this.jobId = jobId;
+        this.jobId = checkNotNull(jobId);
+    }
+
+    public Optional<JobID> getOptionalJobId() {
+        return Optional.ofNullable(jobId);
     }
 
     @Override
     public JobID getJobID() {
-        return jobId;
+        return checkNotNull(jobId);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index f35da5d5c02..e65ec1246ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -284,8 +284,7 @@ public class StreamGraphGenerator {
                                 
.map(DistributedCache::parseCachedFilesFromString)
                                 .orElse(new ArrayList<>())
                                 .stream()
-                                .collect(Collectors.toMap(e -> e.f0, e -> 
e.f1)),
-                        streamGraph.getJobID());
+                                .collect(Collectors.toMap(e -> e.f0, e -> 
e.f1)));
 
         for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
                 distributedCacheEntries.entrySet()) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index aa503e3b20b..66776c03fb5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -135,7 +135,7 @@ public class StreamingJobGraphGenerator {
         return new StreamingJobGraphGenerator(
                         Thread.currentThread().getContextClassLoader(),
                         streamGraph,
-                        streamGraph.getJobID(),
+                        streamGraph.getOptionalJobId().orElse(null),
                         streamGraph.getApplicationId().orElse(null),
                         Runnable::run)
                 .createJobGraph();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/JMFailoverITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/JMFailoverITCase.java
index 2c2aa501813..d5d30eb7180 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/JMFailoverITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/JMFailoverITCase.java
@@ -467,7 +467,7 @@ class JMFailoverITCase {
                 .transform("Sink", TypeInformation.of(Void.class), new 
StubRecordSink())
                 .slotSharingGroup("group4");
 
-        StreamGraph streamGraph = env.getStreamGraph();
+        StreamGraph streamGraph = getStreamGraph(env);
         
streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
         streamGraph.setJobType(JobType.BATCH);
         streamGraph.setJobName(jobName);
@@ -499,7 +499,7 @@ class JMFailoverITCase {
                 .transform("Sink", TypeInformation.of(Void.class), new 
StubRecordSink())
                 .slotSharingGroup("group4");
 
-        StreamGraph streamGraph = env.getStreamGraph();
+        StreamGraph streamGraph = getStreamGraph(env);
         
streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
         streamGraph.setJobType(JobType.BATCH);
         streamGraph.setJobName(jobName);
@@ -511,6 +511,12 @@ class JMFailoverITCase {
         indices.forEach(index -> subtaskBlocked.put(index, block));
     }
 
+    private StreamGraph getStreamGraph(StreamExecutionEnvironment env) {
+        StreamGraph streamGraph = env.getStreamGraph();
+        streamGraph.setJobId(JobID.generate());
+        return streamGraph;
+    }
+
     /**
      * A stub which helps to:
      *
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkITCase.java
index 9d77d87c89b..13f22b075e1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkITCase.java
@@ -769,7 +769,8 @@ class WatermarkITCase {
                 .process(new Operator4ProcessFunction())
                 .withName("Operator4")
                 .withParallelism(DEFAULT_PARALLELISM);
-        return env.getStreamGraph();
+
+        return getStreamGraph(env);
     }
 
     public StreamGraph getStreamGraphForTestSource(
@@ -795,7 +796,8 @@ class WatermarkITCase {
                 .process(new Operator2ProcessFunction(null, null))
                 .withName("Operator2")
                 .withParallelism(DEFAULT_PARALLELISM);
-        return env.getStreamGraph();
+
+        return getStreamGraph(env);
     }
 
     public StreamGraph getStreamGraphForAlignedWatermark(
@@ -1043,4 +1045,10 @@ class WatermarkITCase {
         }
         return true;
     }
+
+    private StreamGraph getStreamGraph(ExecutionEnvironmentImpl env) {
+        StreamGraph streamGraph = env.getStreamGraph();
+        streamGraph.setJobId(JobID.generate());
+        return streamGraph;
+    }
 }

Reply via email to