This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 197c7b97adc9a781c66b5e062cda9662c425f0bb Author: Yi Zhang <[email protected]> AuthorDate: Fri Feb 13 10:30:44 2026 +0800 [FLINK-38973][runtime] Make jobId nullable in StreamGraph to maintain compatibility for fixed jobId --- .../client/deployment/executors/PipelineExecutorUtils.java | 11 +++++++++++ .../program/DefaultPackagedProgramRetrieverITCase.java | 4 ++-- .../apache/flink/runtime/webmonitor/WebFrontendITCase.java | 1 + .../apache/flink/runtime/jobgraph/ExecutionPlanUtils.java | 6 +++--- .../flink/streaming/api/graph/AdaptiveGraphManager.java | 2 +- .../org/apache/flink/streaming/api/graph/StreamGraph.java | 11 +++++++---- .../flink/streaming/api/graph/StreamGraphGenerator.java | 3 +-- .../streaming/api/graph/StreamingJobGraphGenerator.java | 2 +- .../org/apache/flink/test/scheduling/JMFailoverITCase.java | 10 ++++++++-- .../flink/test/streaming/api/datastream/WatermarkITCase.java | 12 ++++++++++-- 10 files changed, 45 insertions(+), 17 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java index 21dbef76c8c..509d7d307ed 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.client.deployment.executors; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.client.cli.ClientOptions; @@ -25,6 +26,7 @@ import org.apache.flink.client.cli.ExecutionConfigAccessor; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.core.execution.JobStatusChangedListener; import org.apache.flink.streaming.api.graph.ExecutionPlan; import org.apache.flink.streaming.api.graph.StreamGraph; @@ -87,6 +89,15 @@ public class PipelineExecutorUtils { final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); + if (streamGraph.getOptionalJobId().isEmpty()) { + JobID jobId = + configuration + .getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID) + .map(JobID::fromHexString) + .orElse(JobID.generate()); + streamGraph.setJobId(jobId); + } + if (configuration.get(DeploymentOptions.ATTACHED) && configuration.get(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) { streamGraph.setInitialClientHeartbeatTimeout( diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java index 61d144c0060..d8fc7898a3b 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java @@ -217,7 +217,7 @@ class DefaultPackagedProgramRetrieverITCase { assertThat(streamGraph.getSavepointRestoreSettings()) .isEqualTo(SavepointRestoreSettings.none()); assertThat(streamGraph.getMaximumParallelism()).isEqualTo(parallelism); - // we don't check the job id because StreamPlanEnvironment does not support fixed job id + assertThat(streamGraph.getJobID()).isEqualTo(jobId); } @Test @@ -281,7 +281,7 @@ class DefaultPackagedProgramRetrieverITCase { final StreamGraph streamGraph = retrieveStreamGraph(retrieverUnderTest, configuration); assertThat(streamGraph.getSavepointRestoreSettings()).isEqualTo(savepointRestoreSettings); - // we don't check the job id because StreamPlanEnvironment does not support fixed job id + assertThat(streamGraph.getJobID()).isEqualTo(jobId); } @Test diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java index 5fd80337473..bd51f730a01 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java @@ -554,6 +554,7 @@ class WebFrontendITCase { .setParallelism(2) .addSink(new SinkFunction<>() {}); StreamGraph streamGraph = env.getStreamGraph(); + streamGraph.setJobId(JobID.generate()); final JobID jid = streamGraph.getJobID(); clusterClient.submitJob(streamGraph).get(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ExecutionPlanUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ExecutionPlanUtils.java index 71d5207c400..d9f04aab43e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ExecutionPlanUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ExecutionPlanUtils.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.jobgraph; -import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; @@ -32,6 +31,7 @@ import java.io.IOException; import java.nio.file.Files; import java.util.HashMap; import java.util.Map; +import java.util.UUID; /** Utilities for generating {@link org.apache.flink.streaming.api.graph.ExecutionPlan}. */ public enum ExecutionPlanUtils { @@ -40,13 +40,13 @@ public enum ExecutionPlanUtils { private static final Logger LOG = LoggerFactory.getLogger(ExecutionPlanUtils.class); public static Map<String, DistributedCache.DistributedCacheEntry> prepareUserArtifactEntries( - Map<String, DistributedCache.DistributedCacheEntry> userArtifacts, JobID jobId) { + Map<String, DistributedCache.DistributedCacheEntry> userArtifacts) { final Map<String, DistributedCache.DistributedCacheEntry> result = new HashMap<>(); if (userArtifacts != null && !userArtifacts.isEmpty()) { try { java.nio.file.Path tmpDir = - Files.createTempDirectory("flink-distributed-cache-" + jobId); + Files.createTempDirectory("flink-distributed-cache-" + UUID.randomUUID()); for (Map.Entry<String, DistributedCache.DistributedCacheEntry> originalEntry : userArtifacts.entrySet()) { Path filePath = new Path(originalEntry.getValue().filePath); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java index 05d430ff8de..b6dc6a95e46 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java @@ -191,7 +191,7 @@ public class AdaptiveGraphManager this.jobGraph = createAndInitializeJobGraph( streamGraph, - streamGraph.getJobID(), + streamGraph.getOptionalJobId().orElse(null), streamGraph.getApplicationId().orElse(null)); this.defaultSlotSharingGroup = new SlotSharingGroup(); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index db80a3bca41..f03912bc857 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -134,7 +134,7 @@ public class StreamGraph implements Pipeline, ExecutionPlan { private String jobName; - private JobID jobId; + @Nullable private JobID jobId; /** ID of the application this job belongs to. */ @Nullable private ApplicationID applicationId; @@ -213,7 +213,6 @@ public class StreamGraph implements Pipeline, ExecutionPlan { this.executionConfig = checkNotNull(executionConfig); this.checkpointConfig = checkNotNull(checkpointConfig); this.savepointRestoreSettings = checkNotNull(savepointRestoreSettings); - this.jobId = new JobID(); this.jobName = "(unnamed job)"; // create an empty new stream graph. @@ -1281,12 +1280,16 @@ public class StreamGraph implements Pipeline, ExecutionPlan { } public void setJobId(JobID jobId) { - this.jobId = jobId; + this.jobId = checkNotNull(jobId); + } + + public Optional<JobID> getOptionalJobId() { + return Optional.ofNullable(jobId); } @Override public JobID getJobID() { - return jobId; + return checkNotNull(jobId); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index f35da5d5c02..e65ec1246ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -284,8 +284,7 @@ public class StreamGraphGenerator { .map(DistributedCache::parseCachedFilesFromString) .orElse(new ArrayList<>()) .stream() - .collect(Collectors.toMap(e -> e.f0, e -> e.f1)), - streamGraph.getJobID()); + .collect(Collectors.toMap(e -> e.f0, e -> e.f1))); for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : distributedCacheEntries.entrySet()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index aa503e3b20b..66776c03fb5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -135,7 +135,7 @@ public class StreamingJobGraphGenerator { return new StreamingJobGraphGenerator( Thread.currentThread().getContextClassLoader(), streamGraph, - streamGraph.getJobID(), + streamGraph.getOptionalJobId().orElse(null), streamGraph.getApplicationId().orElse(null), Runnable::run) .createJobGraph(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/JMFailoverITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/JMFailoverITCase.java index 2c2aa501813..d5d30eb7180 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/JMFailoverITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/JMFailoverITCase.java @@ -467,7 +467,7 @@ class JMFailoverITCase { .transform("Sink", TypeInformation.of(Void.class), new StubRecordSink()) .slotSharingGroup("group4"); - StreamGraph streamGraph = env.getStreamGraph(); + StreamGraph streamGraph = getStreamGraph(env); streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING); streamGraph.setJobType(JobType.BATCH); streamGraph.setJobName(jobName); @@ -499,7 +499,7 @@ class JMFailoverITCase { .transform("Sink", TypeInformation.of(Void.class), new StubRecordSink()) .slotSharingGroup("group4"); - StreamGraph streamGraph = env.getStreamGraph(); + StreamGraph streamGraph = getStreamGraph(env); streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING); streamGraph.setJobType(JobType.BATCH); streamGraph.setJobName(jobName); @@ -511,6 +511,12 @@ class JMFailoverITCase { indices.forEach(index -> subtaskBlocked.put(index, block)); } + private StreamGraph getStreamGraph(StreamExecutionEnvironment env) { + StreamGraph streamGraph = env.getStreamGraph(); + streamGraph.setJobId(JobID.generate()); + return streamGraph; + } + /** * A stub which helps to: * diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkITCase.java index 9d77d87c89b..13f22b075e1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkITCase.java @@ -769,7 +769,8 @@ class WatermarkITCase { .process(new Operator4ProcessFunction()) .withName("Operator4") .withParallelism(DEFAULT_PARALLELISM); - return env.getStreamGraph(); + + return getStreamGraph(env); } public StreamGraph getStreamGraphForTestSource( @@ -795,7 +796,8 @@ class WatermarkITCase { .process(new Operator2ProcessFunction(null, null)) .withName("Operator2") .withParallelism(DEFAULT_PARALLELISM); - return env.getStreamGraph(); + + return getStreamGraph(env); } public StreamGraph getStreamGraphForAlignedWatermark( @@ -1043,4 +1045,10 @@ class WatermarkITCase { } return true; } + + private StreamGraph getStreamGraph(ExecutionEnvironmentImpl env) { + StreamGraph streamGraph = env.getStreamGraph(); + streamGraph.setJobId(JobID.generate()); + return streamGraph; + } }
