This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5dbadfd7f481e9ecdc63ec298af4e53f69786cdf
Author: Yi Zhang <[email protected]>
AuthorDate: Tue Jan 27 11:06:57 2026 +0800

    [FLINK-38973][runtime] Job ID assignment and matching based on name
---
 .../5b9eed8a-5fb6-4373-98ac-3be2a71941b8           |   2 +-
 .../java/org/apache/flink/client/ClientUtils.java  |  11 +-
 ...ApplicationDispatcherGatewayServiceFactory.java |  25 +-
 .../application/PackagedProgramApplication.java    |  44 ++-
 .../application/executors/EmbeddedExecutor.java    |  12 +-
 .../executors/PipelineExecutorUtils.java           |   6 -
 .../apache/flink/client/program/JobIdManager.java  |  45 +++
 .../client/program/JobNameBasedJobIdManager.java   | 254 +++++++++++++++++
 .../client/program/StreamContextEnvironment.java   |  36 ++-
 .../PackagedProgramApplicationTest.java            |   7 -
 .../DefaultPackagedProgramRetrieverITCase.java     |   4 +-
 .../program/JobNameBasedJobIdManagerTest.java      | 301 +++++++++++++++++++++
 .../handlers/JarRunApplicationHandler.java         |  10 +-
 .../apache/flink/runtime/jobmaster/JobResult.java  |  17 ++
 .../rest/messages/json/JobResultDeserializer.java  |   6 +
 .../rest/messages/json/JobResultSerializer.java    |   5 +
 .../runtime/testutils/TestingJobResultStore.java   |   9 +-
 .../service/application/ScriptExecutorITCase.java  |   3 +-
 18 files changed, 750 insertions(+), 47 deletions(-)

diff --git 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
index 256be5208ff..ccf1f8ba490 100644
--- 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
+++ 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
@@ -21,7 +21,7 @@ 
org.apache.flink.cep.pattern.conditions.IterativeCondition.filter(java.lang.Obje
 
org.apache.flink.cep.pattern.conditions.SimpleCondition.filter(java.lang.Object,
 org.apache.flink.cep.pattern.conditions.IterativeCondition$Context): Argument 
leaf type org.apache.flink.cep.pattern.conditions.IterativeCondition$Context 
does not satisfy: reside outside of package 'org.apache.flink..' or reside in 
any package ['..shaded..'] or annotated with @Public or annotated with 
@PublicEvolving or annotated with @Deprecated
 
org.apache.flink.client.program.StreamContextEnvironment.execute(org.apache.flink.streaming.api.graph.StreamGraph):
 Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not 
satisfy: reside outside of package 'org.apache.flink..' or reside in any 
package ['..shaded..'] or annotated with @Public or annotated with 
@PublicEvolving or annotated with @Deprecated
 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(org.apache.flink.streaming.api.graph.StreamGraph):
 Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not 
satisfy: reside outside of package 'org.apache.flink..' or reside in any 
package ['..shaded..'] or annotated with @Public or annotated with 
@PublicEvolving or annotated with @Deprecated
-org.apache.flink.client.program.StreamContextEnvironment.setAsContext(org.apache.flink.core.execution.PipelineExecutorServiceLoader,
 org.apache.flink.configuration.Configuration, java.lang.ClassLoader, boolean, 
boolean, org.apache.flink.api.common.ApplicationID): Argument leaf type 
org.apache.flink.core.execution.PipelineExecutorServiceLoader does not satisfy: 
reside outside of package 'org.apache.flink..' or reside in any package 
['..shaded..'] or annotated with @Public or annotated wit [...]
+org.apache.flink.client.program.StreamContextEnvironment.setAsContext(org.apache.flink.core.execution.PipelineExecutorServiceLoader,
 org.apache.flink.configuration.Configuration, java.lang.ClassLoader, boolean, 
boolean, org.apache.flink.api.common.ApplicationID, java.util.Collection): 
Argument leaf type 
org.apache.flink.core.execution.PipelineExecutorServiceLoader does not satisfy: 
reside outside of package 'org.apache.flink..' or reside in any package 
['..shaded..'] or annotated with @P [...]
 
org.apache.flink.client.program.StreamPlanEnvironment.executeAsync(org.apache.flink.streaming.api.graph.StreamGraph):
 Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not 
satisfy: reside outside of package 'org.apache.flink..' or reside in any 
package ['..shaded..'] or annotated with @Public or annotated with 
@PublicEvolving or annotated with @Deprecated
 org.apache.flink.client.program.StreamPlanEnvironment.getPipeline(): Returned 
leaf type org.apache.flink.api.dag.Pipeline does not satisfy: reside outside of 
package 'org.apache.flink..' or reside in any package ['..shaded..'] or 
annotated with @Public or annotated with @PublicEvolving or annotated with 
@Deprecated
 
org.apache.flink.configuration.ClusterOptions.getSchedulerType(org.apache.flink.configuration.Configuration):
 Returned leaf type 
org.apache.flink.configuration.JobManagerOptions$SchedulerType does not 
satisfy: reside outside of package 'org.apache.flink..' or reside in any 
package ['..shaded..'] or annotated with @Public or annotated with 
@PublicEvolving or annotated with @Deprecated
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java 
b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index dd680883bc5..4604b109d28 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.client;
 
 import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobInfo;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.client.cli.ClientOptions;
 import org.apache.flink.client.program.PackagedProgram;
@@ -49,6 +50,7 @@ import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.Executors;
@@ -89,7 +91,8 @@ public enum ClientUtils {
                 program,
                 enforceSingleJobExecution,
                 suppressSysout,
-                null);
+                null,
+                Collections.emptyList());
     }
 
     public static void executeProgram(
@@ -98,7 +101,8 @@ public enum ClientUtils {
             PackagedProgram program,
             boolean enforceSingleJobExecution,
             boolean suppressSysout,
-            @Nullable ApplicationID applicationId)
+            @Nullable ApplicationID applicationId,
+            Collection<JobInfo> allRecoveredJobInfos)
             throws ProgramInvocationException {
         checkNotNull(executorServiceLoader);
         final ClassLoader userCodeClassLoader = 
program.getUserCodeClassLoader();
@@ -116,7 +120,8 @@ public enum ClientUtils {
                     userCodeClassLoader,
                     enforceSingleJobExecution,
                     suppressSysout,
-                    applicationId);
+                    applicationId,
+                    allRecoveredJobInfos);
 
             // For DataStream v2.
             ExecutionContextEnvironment.setAsContext(
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
index 6c443c4b649..fa7f597863b 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
@@ -20,7 +20,8 @@ package org.apache.flink.client.deployment.application;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ApplicationID;
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobInfo;
+import org.apache.flink.api.common.JobInfoImpl;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.ApplicationOptionsInternal;
 import org.apache.flink.configuration.Configuration;
@@ -92,7 +93,9 @@ public class ApplicationDispatcherGatewayServiceFactory
             ExecutionPlanWriter executionPlanWriter,
             JobResultStore jobResultStore) {
 
-        final List<JobID> recoveredJobIds = getRecoveredJobIds(recoveredJobs);
+        final List<JobInfo> recoveredJobInfos = 
getRecoveredJobInfos(recoveredJobs);
+        final List<JobInfo> recoveredTerminalJobInfos =
+                getRecoveredTerminalJobInfos(recoveredDirtyJobResults);
 
         final boolean allowExecuteMultipleJobs =
                 ApplicationJobUtils.allowExecuteMultipleJobs(configuration);
@@ -107,7 +110,8 @@ public class ApplicationDispatcherGatewayServiceFactory
                 new PackagedProgramApplication(
                         applicationId,
                         program,
-                        recoveredJobIds,
+                        recoveredJobInfos,
+                        recoveredTerminalJobInfos,
                         configuration,
                         true,
                         !allowExecuteMultipleJobs,
@@ -137,7 +141,18 @@ public class ApplicationDispatcherGatewayServiceFactory
         return DefaultDispatcherGatewayService.from(dispatcher);
     }
 
-    private List<JobID> getRecoveredJobIds(final Collection<ExecutionPlan> 
recoveredJobs) {
-        return 
recoveredJobs.stream().map(ExecutionPlan::getJobID).collect(Collectors.toList());
+    private List<JobInfo> getRecoveredJobInfos(final Collection<ExecutionPlan> 
recoveredJobs) {
+        return recoveredJobs.stream()
+                .map(
+                        executionPlan ->
+                                new JobInfoImpl(executionPlan.getJobID(), 
executionPlan.getName()))
+                .collect(Collectors.toList());
+    }
+
+    private List<JobInfo> getRecoveredTerminalJobInfos(
+            final Collection<JobResult> recoveredDirtyJobResults) {
+        return recoveredDirtyJobResults.stream()
+                .map(jobResult -> new JobInfoImpl(jobResult.getJobId(), 
jobResult.getJobName()))
+                .collect(Collectors.toList());
     }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
index 95f54b69de1..a06d1a13854 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.ApplicationState;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobInfo;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.cli.ClientOptions;
@@ -48,7 +49,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -64,6 +64,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -79,7 +80,9 @@ public class PackagedProgramApplication extends 
AbstractApplication {
 
     private final PackagedProgramDescriptor programDescriptor;
 
-    private final Collection<JobID> recoveredJobIds;
+    private final Collection<JobInfo> recoveredJobInfos;
+
+    private final Collection<JobInfo> recoveredTerminalJobInfos;
 
     private final Configuration configuration;
 
@@ -104,7 +107,28 @@ public class PackagedProgramApplication extends 
AbstractApplication {
     public PackagedProgramApplication(
             final ApplicationID applicationId,
             final PackagedProgram program,
-            final Collection<JobID> recoveredJobIds,
+            final Configuration configuration,
+            final boolean handleFatalError,
+            final boolean enforceSingleJobExecution,
+            final boolean submitFailedJobOnApplicationError,
+            final boolean shutDownOnFinish) {
+        this(
+                applicationId,
+                program,
+                Collections.emptyList(),
+                Collections.emptyList(),
+                configuration,
+                handleFatalError,
+                enforceSingleJobExecution,
+                submitFailedJobOnApplicationError,
+                shutDownOnFinish);
+    }
+
+    public PackagedProgramApplication(
+            final ApplicationID applicationId,
+            final PackagedProgram program,
+            final Collection<JobInfo> recoveredJobInfos,
+            final Collection<JobInfo> recoveredTerminalJobInfos,
             final Configuration configuration,
             final boolean handleFatalError,
             final boolean enforceSingleJobExecution,
@@ -112,7 +136,8 @@ public class PackagedProgramApplication extends 
AbstractApplication {
             final boolean shutDownOnFinish) {
         super(applicationId);
         this.program = checkNotNull(program);
-        this.recoveredJobIds = checkNotNull(recoveredJobIds);
+        this.recoveredJobInfos = checkNotNull(recoveredJobInfos);
+        this.recoveredTerminalJobInfos = 
checkNotNull(recoveredTerminalJobInfos);
         this.configuration = checkNotNull(configuration);
         this.handleFatalError = handleFatalError;
         this.enforceSingleJobExecution = enforceSingleJobExecution;
@@ -527,7 +552,8 @@ public class PackagedProgramApplication extends 
AbstractApplication {
                                             .key())));
             return;
         }
-        final List<JobID> applicationJobIds = new ArrayList<>(recoveredJobIds);
+        final List<JobID> applicationJobIds =
+                
recoveredJobInfos.stream().map(JobInfo::getJobId).collect(Collectors.toList());
         try {
             if (program == null) {
                 LOG.info("Reconstructing program from descriptor {}", 
programDescriptor);
@@ -544,7 +570,8 @@ public class PackagedProgramApplication extends 
AbstractApplication {
                     program,
                     enforceSingleJobExecution,
                     true /* suppress sysout */,
-                    getApplicationId());
+                    getApplicationId(),
+                    getAllRecoveredJobInfos());
 
             if (applicationJobIds.isEmpty()) {
                 jobIdsFuture.completeExceptionally(
@@ -581,6 +608,11 @@ public class PackagedProgramApplication extends 
AbstractApplication {
         }
     }
 
+    private Collection<JobInfo> getAllRecoveredJobInfos() {
+        return Stream.concat(recoveredJobInfos.stream(), 
recoveredTerminalJobInfos.stream())
+                .collect(Collectors.toList());
+    }
+
     private CompletableFuture<Void> waitForJobResults(
             final DispatcherGateway dispatcherGateway,
             final Collection<JobID> applicationJobIds,
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
index baed45dd649..b2a1db2ad3c 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
@@ -51,6 +51,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A base class for {@link PipelineExecutor executors} that invoke directly 
methods of the {@link
@@ -110,14 +111,21 @@ public class EmbeddedExecutor implements PipelineExecutor 
{
             throws Exception {
         checkNotNull(pipeline);
         checkNotNull(configuration);
+        checkState(pipeline instanceof StreamGraph);
+
+        StreamGraph streamGraph = (StreamGraph) pipeline;
 
         final Optional<JobID> optJobId =
                 configuration
                         
.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)
                         .map(JobID::fromHexString);
 
-        if (optJobId.isPresent() && submittedJobIds.contains(optJobId.get())) {
-            return getJobClientFuture(optJobId.get(), userCodeClassloader);
+        // Skip resubmission if the job is recovered via HA.
+        // When optJobId is present, the streamGraph's ID is deterministically 
derived from it. In
+        // this case, if the streamGraph's ID is in submittedJobIds, it means 
the job was submitted
+        // in a previous run and should not be resubmitted.
+        if (optJobId.isPresent() && 
submittedJobIds.contains(streamGraph.getJobID())) {
+            return getJobClientFuture(streamGraph.getJobID(), 
userCodeClassloader);
         }
 
         return submitAndGetJobClientFuture(pipeline, configuration, 
userCodeClassloader);
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
index bdfde6f7cc0..21dbef76c8c 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.client.deployment.executors;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.client.cli.ClientOptions;
@@ -26,7 +25,6 @@ import org.apache.flink.client.cli.ExecutionConfigAccessor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.ExecutionOptions;
-import org.apache.flink.configuration.PipelineOptionsInternal;
 import org.apache.flink.core.execution.JobStatusChangedListener;
 import org.apache.flink.streaming.api.graph.ExecutionPlan;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -89,10 +87,6 @@ public class PipelineExecutorUtils {
         final ExecutionConfigAccessor executionConfigAccessor =
                 ExecutionConfigAccessor.fromConfiguration(configuration);
 
-        configuration
-                .getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)
-                .ifPresent(strJobID -> 
streamGraph.setJobId(JobID.fromHexString(strJobID)));
-
         if (configuration.get(DeploymentOptions.ATTACHED)
                 && configuration.get(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
             streamGraph.setInitialClientHeartbeatTimeout(
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/JobIdManager.java 
b/flink-clients/src/main/java/org/apache/flink/client/program/JobIdManager.java
new file mode 100644
index 00000000000..0f472913610
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/JobIdManager.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.streaming.api.graph.StreamGraph;
+
+/**
+ * Interface for managing the job IDs.
+ *
+ * <p>This interface allows custom logic to update the job ID of a {@link 
StreamGraph} before
+ * submitting the job.
+ */
+public interface JobIdManager {
+
+    /**
+     * Updates the job ID of the given {@link StreamGraph}.
+     *
+     * @param streamGraph The {@link StreamGraph} to update.
+     */
+    void updateJobId(StreamGraph streamGraph);
+
+    JobIdManager NO_OP =
+            new JobIdManager() {
+                @Override
+                public void updateJobId(StreamGraph streamGraph) {
+                    // no-op
+                }
+            };
+}
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/JobNameBasedJobIdManager.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/JobNameBasedJobIdManager.java
new file mode 100644
index 00000000000..4259844a059
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/JobNameBasedJobIdManager.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobInfo;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link JobIdManager} implementation that assigns job IDs based on job 
names, supporting
+ * recovery from previously submitted jobs and deterministic ID assignment.
+ *
+ * <p>This manager ensures that:
+ *
+ * <ul>
+ *   <li><b>Compatibility</b>: The first submitted job retains the original 
fixed {@code baseJobId},
+ *       ensuring full backward compatibility with existing behavior.
+ *   <li><b>Uniqueness</b>: Guarantees globally unique job IDs across all 
submitted jobs.
+ *   <li><b>Correct Association</b>: Matches recovered jobs strictly by job 
name to prevent
+ *       cross-name misassociation; for jobs with the same name, associates 
them in strict
+ *       submission order.
+ * </ul>
+ */
+public class JobNameBasedJobIdManager implements JobIdManager {
+
+    private final JobID baseJobId;
+
+    private final Map<String, ArrayDeque<JobID>> recoveredJobNameToJobIds = 
new HashMap<>();
+
+    private final Set<JobID> occupiedJobIds = new HashSet<>();
+
+    private @Nullable String firstJobName;
+
+    /**
+     * Creates a new job ID manager with the given base job ID and recovered 
job information.
+     *
+     * @param baseJobId the base job ID used as the origin for deriving all 
job IDs
+     * @param allRecoveredJobInfos a collection of recovered job information; 
if non-empty, it
+     *     <b>must contain</b> a job with ID equal to {@code baseJobId}
+     */
+    public JobNameBasedJobIdManager(JobID baseJobId, Collection<JobInfo> 
allRecoveredJobInfos) {
+        this.baseJobId = checkNotNull(baseJobId);
+        checkNotNull(allRecoveredJobInfos);
+
+        if (!allRecoveredJobInfos.isEmpty()) {
+            // step 1: find the first job name
+            Optional<JobInfo> optionalBaseJobInfo =
+                    allRecoveredJobInfos.stream()
+                            .filter(jobInfo -> 
jobInfo.getJobId().equals(baseJobId))
+                            .findFirst();
+
+            if (optionalBaseJobInfo.isEmpty()) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Base job ID %s is not found in the recovered 
jobs.", baseJobId));
+            }
+
+            this.firstJobName = optionalBaseJobInfo.get().getJobName();
+
+            // step 2: group jobs by job name and sort then according to the 
job index
+            Map<String, List<JobID>> grouped =
+                    allRecoveredJobInfos.stream()
+                            .collect(
+                                    Collectors.groupingBy(
+                                            JobInfo::getJobName,
+                                            Collectors.mapping(
+                                                    JobInfo::getJobId, 
Collectors.toList())));
+
+            for (Map.Entry<String, List<JobID>> entry : grouped.entrySet()) {
+                String jobName = entry.getKey();
+                List<JobID> jobIds = entry.getValue();
+
+                int initialIndex = getInitialIndex(jobName);
+
+                jobIds.sort(createJobIdComparator(initialIndex, baseJobId));
+
+                this.recoveredJobNameToJobIds.put(jobName, new 
ArrayDeque<>(jobIds));
+            }
+
+            // step 3: initialize occupied job ids
+            allRecoveredJobInfos.forEach(jobInfo -> 
this.occupiedJobIds.add(jobInfo.getJobId()));
+        }
+    }
+
+    /**
+     * Assigns a job ID to the given {@link StreamGraph} based on its job name 
and the current state
+     * of recovered and occupied job IDs.
+     *
+     * <p>Assignment follows the logic:
+     *
+     * <ul>
+     *   <li>First, attempts to reuse a recovered job ID for the same job name 
(in assignment
+     *       order).
+     *   <li>If no recovered job is available, assigns a new job ID by probing 
from the name-derived
+     *       starting index until an unused ID is found.
+     * </ul>
+     *
+     * @param streamGraph the stream graph to assign a job ID to
+     */
+    @Override
+    public void updateJobId(StreamGraph streamGraph) {
+        String jobName = checkNotNull(streamGraph.getJobName());
+
+        // case 1: try to find a job id from the recovered jobs
+        ArrayDeque<JobID> recoveredJobIds = 
recoveredJobNameToJobIds.get(jobName);
+        if (recoveredJobIds != null) {
+            JobID jobId = recoveredJobIds.pollFirst();
+            if (recoveredJobIds.isEmpty()) {
+                recoveredJobNameToJobIds.remove(jobName);
+            }
+            streamGraph.setJobId(jobId);
+            return;
+        }
+
+        // case 2: generate a new job id
+        final JobID assignedJobId;
+        if (firstJobName == null) {
+            firstJobName = jobName;
+            assignedJobId = baseJobId;
+        } else {
+            int initialIndex = getInitialIndex(jobName);
+            int step = 0;
+            while (true) {
+                // the calculated index may overflow and wrap around
+                int candidateIndex = initialIndex + step;
+                JobID candidateId = fromJobIndex(candidateIndex, baseJobId);
+                if (!occupiedJobIds.contains(candidateId)) {
+                    assignedJobId = candidateId;
+                    break;
+                }
+
+                // the step may overflow and wrap around
+                step++;
+                if (step == 0) {
+                    throw new IllegalStateException(
+                            String.format("Exhausted all job IDs for job name: 
%s", jobName));
+                }
+            }
+        }
+        streamGraph.setJobId(assignedJobId);
+        occupiedJobIds.add(assignedJobId);
+    }
+
+    @VisibleForTesting
+    String getFirstJobName() {
+        return firstJobName;
+    }
+
+    @VisibleForTesting
+    int getInitialIndex(String jobName) {
+        checkNotNull(firstJobName);
+
+        return jobName.equals(firstJobName) ? 0 : jobName.hashCode();
+    }
+
+    /**
+     * Creates a new job ID by adding the given index to both parts of the 
base job ID.
+     *
+     * <p>Note: Both the lower part and upper part may overflow and wrap 
around.
+     *
+     * @param index the index to add to the base job ID parts
+     * @param baseId the base job ID to derive from
+     * @return a new job ID with index applied to both parts
+     */
+    @VisibleForTesting
+    static JobID fromJobIndex(int index, JobID baseId) {
+        return new JobID(baseId.getLowerPart() + index, baseId.getUpperPart() 
+ index);
+    }
+
+    /**
+     * Computes the index of the given job ID relative to the base job ID.
+     *
+     * <p>This method correctly calculates the index even when the lower and 
upper parts of the job
+     * ID have wrapped around due to overflow.
+     *
+     * @param jobId the job ID to derive the index from
+     * @param baseId the base job ID
+     * @return the index of the given job ID relative to the base job ID
+     */
+    @VisibleForTesting
+    static int getJobIndex(JobID jobId, JobID baseId) {
+        long lowerDiff = jobId.getLowerPart() - baseId.getLowerPart();
+        long upperDiff = jobId.getUpperPart() - baseId.getUpperPart();
+
+        if (lowerDiff != upperDiff
+                || lowerDiff < Integer.MIN_VALUE
+                || lowerDiff > Integer.MAX_VALUE) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Job ID %s is not derived from the base job ID 
%s.", jobId, baseId));
+        }
+
+        return (int) lowerDiff;
+    }
+
+    /**
+     * Creates a comparator for sorting job IDs based on their logical 
position relative to an
+     * initial index, supporting circular sequence number semantics.
+     *
+     * <p>This comparator handles the case where job indices may have wrapped 
around due to overflow
+     * by treating the difference as an unsigned 32-bit value. This allows 
proper ordering of job
+     * IDs even when their indices span across the boundary where signed 
integers would overflow.
+     *
+     * @param initialIndex the starting index used as reference point for 
calculating logical
+     *     offsets
+     * @return a comparator that sorts job IDs by their calculated unsigned 
offset from the initial
+     *     index
+     */
+    @VisibleForTesting
+    static Comparator<JobID> createJobIdComparator(int initialIndex, JobID 
baseId) {
+        return (id1, id2) -> {
+            int idx1 = getJobIndex(id1, baseId);
+            int idx2 = getJobIndex(id2, baseId);
+
+            // calculate logical offset from initialIndex using unsigned 
32-bit subtraction
+            // to support circular sequence numbers.
+            long steps1 = (idx1 - initialIndex) & 0xFFFFFFFFL;
+            long steps2 = (idx2 - initialIndex) & 0xFFFFFFFFL;
+            return Long.compare(steps1, steps2);
+        };
+    }
+}
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
index 2799ab63cb5..d45a49d52c3 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
@@ -21,10 +21,13 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobInfo;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.cli.ClientOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.PipelineOptionsInternal;
 import org.apache.flink.core.execution.DetachedJobExecutionResult;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.JobListener;
@@ -49,6 +52,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -77,6 +81,8 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
 
     @Nullable private final ApplicationID applicationId;
 
+    private final JobIdManager jobIdManager;
+
     public StreamContextEnvironment(
             final PipelineExecutorServiceLoader executorServiceLoader,
             final Configuration configuration,
@@ -112,7 +118,8 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
                 suppressSysout,
                 programConfigEnabled,
                 programConfigWildcards,
-                null);
+                null,
+                Collections.emptyList());
     }
 
     @Internal
@@ -125,7 +132,8 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
             final boolean suppressSysout,
             final boolean programConfigEnabled,
             final Collection<String> programConfigWildcards,
-            @Nullable final ApplicationID applicationId) {
+            @Nullable final ApplicationID applicationId,
+            final Collection<JobInfo> allRecoveredJobInfos) {
         super(executorServiceLoader, configuration, userCodeClassLoader);
         this.suppressSysout = suppressSysout;
         this.enforceSingleJobExecution = enforceSingleJobExecution;
@@ -134,6 +142,23 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
         this.programConfigEnabled = programConfigEnabled;
         this.programConfigWildcards = programConfigWildcards;
         this.applicationId = applicationId;
+        this.jobIdManager = createJobIdManager(configuration, 
allRecoveredJobInfos);
+    }
+
+    private JobIdManager createJobIdManager(
+            Configuration configuration, Collection<JobInfo> 
allRecoveredJobInfos) {
+        Optional<String> optionalBaseJobId =
+                
configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
+        // If optionalBaseJobId is empty, it means the user does not 
explicitly specify a job ID
+        // and HA mode is not enabled. In this case, deterministic job IDs are 
unnecessary since
+        // the user is not expecting a predefined job ID and no job matching 
is required for HA
+        // recovery. So we can use the NO_OP JobIdManager and keep the 
randomly generated job IDs.
+        if (optionalBaseJobId.isEmpty()) {
+            return JobIdManager.NO_OP;
+        }
+
+        JobID baseJobId = JobID.fromHexString(optionalBaseJobId.get());
+        return new JobNameBasedJobIdManager(baseJobId, allRecoveredJobInfos);
     }
 
     @Override
@@ -216,6 +241,7 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
         if (applicationId != null) {
             streamGraph.setApplicationId(applicationId);
         }
+        jobIdManager.updateJobId(streamGraph);
         final JobClient jobClient = super.executeAsync(streamGraph);
 
         if (!suppressSysout) {
@@ -241,7 +267,8 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
             final ClassLoader userCodeClassLoader,
             final boolean enforceSingleJobExecution,
             final boolean suppressSysout,
-            @Nullable final ApplicationID applicationId) {
+            @Nullable final ApplicationID applicationId,
+            Collection<JobInfo> allRecoveredJobInfos) {
         final StreamExecutionEnvironmentFactory factory =
                 envInitConfig -> {
                     final boolean programConfigEnabled =
@@ -260,7 +287,8 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
                             suppressSysout,
                             programConfigEnabled,
                             programConfigWildcards,
-                            applicationId);
+                            applicationId,
+                            allRecoveredJobInfos);
                 };
         initializeContextEnvironment(factory);
     }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java
index 12affab4747..2ae9882b475 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java
@@ -57,7 +57,6 @@ import org.junit.jupiter.params.provider.EnumSource;
 
 import javax.annotation.Nullable;
 
-import java.util.Collections;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedDeque;
@@ -176,7 +175,6 @@ public class PackagedProgramApplicationTest {
                 new PackagedProgramApplication(
                         new ApplicationID(),
                         getProgram(2),
-                        Collections.emptyList(),
                         getConfiguration(),
                         true,
                         false,
@@ -269,7 +267,6 @@ public class PackagedProgramApplicationTest {
                 new PackagedProgramApplication(
                         new ApplicationID(),
                         getProgram(2),
-                        Collections.emptyList(),
                         getConfiguration(),
                         true,
                         false,
@@ -481,7 +478,6 @@ public class PackagedProgramApplicationTest {
                 new PackagedProgramApplication(
                         new ApplicationID(),
                         getProgram(1),
-                        Collections.emptyList(),
                         getConfiguration(),
                         true,
                         false,
@@ -1081,7 +1077,6 @@ public class PackagedProgramApplicationTest {
                 new PackagedProgramApplication(
                         new ApplicationID(),
                         FailingJob.getProgram(),
-                        Collections.emptyList(),
                         configuration,
                         true,
                         true /* enforceSingleJobExecution */,
@@ -1100,7 +1095,6 @@ public class PackagedProgramApplicationTest {
                 new PackagedProgramApplication(
                         new ApplicationID(),
                         FailingJob.getProgram(),
-                        Collections.emptyList(),
                         getConfiguration(),
                         true,
                         false /* enforceSingleJobExecution */,
@@ -1234,7 +1228,6 @@ public class PackagedProgramApplicationTest {
                 new PackagedProgramApplication(
                         new ApplicationID(),
                         program,
-                        Collections.emptyList(),
                         configuration,
                         handleFatalError,
                         enforceSingleJobExecution,
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java
index d8fc7898a3b..61d144c0060 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java
@@ -217,7 +217,7 @@ class DefaultPackagedProgramRetrieverITCase {
         assertThat(streamGraph.getSavepointRestoreSettings())
                 .isEqualTo(SavepointRestoreSettings.none());
         assertThat(streamGraph.getMaximumParallelism()).isEqualTo(parallelism);
-        assertThat(streamGraph.getJobID()).isEqualTo(jobId);
+        // we don't check the job id because StreamPlanEnvironment does not 
support fixed job id
     }
 
     @Test
@@ -281,7 +281,7 @@ class DefaultPackagedProgramRetrieverITCase {
         final StreamGraph streamGraph = 
retrieveStreamGraph(retrieverUnderTest, configuration);
 
         
assertThat(streamGraph.getSavepointRestoreSettings()).isEqualTo(savepointRestoreSettings);
-        assertThat(streamGraph.getJobID()).isEqualTo(jobId);
+        // we don't check the job id because StreamPlanEnvironment does not 
support fixed job id
     }
 
     @Test
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/JobNameBasedJobIdManagerTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/JobNameBasedJobIdManagerTest.java
new file mode 100644
index 00000000000..ff6ff29cd2e
--- /dev/null
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/JobNameBasedJobIdManagerTest.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobInfo;
+import org.apache.flink.api.common.JobInfoImpl;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link JobNameBasedJobIdManager}. */
+class JobNameBasedJobIdManagerTest {
+
+    static Stream<Arguments> provideJobIndexTestCases() {
+        return Stream.of(
+                // simple cases
+                Arguments.of(0, new JobID(100L, 200L), new JobID(100L, 200L)),
+                Arguments.of(1, new JobID(100L, 200L), new JobID(101L, 201L)),
+                Arguments.of(-1, new JobID(100L, 200L), new JobID(99L, 199L)),
+                // overflow
+                Arguments.of(
+                        1,
+                        new JobID(Long.MAX_VALUE, Long.MAX_VALUE),
+                        new JobID(Long.MIN_VALUE, Long.MIN_VALUE)),
+                Arguments.of(
+                        -1,
+                        new JobID(Long.MIN_VALUE, Long.MIN_VALUE),
+                        new JobID(Long.MAX_VALUE, Long.MAX_VALUE)),
+                Arguments.of(
+                        Integer.MAX_VALUE,
+                        new JobID(Long.MAX_VALUE, Long.MAX_VALUE),
+                        new JobID(
+                                Long.MIN_VALUE + Integer.MAX_VALUE - 1,
+                                Long.MIN_VALUE + Integer.MAX_VALUE - 1)),
+                Arguments.of(
+                        Integer.MIN_VALUE,
+                        new JobID(Long.MIN_VALUE, Long.MIN_VALUE),
+                        new JobID(
+                                Long.MAX_VALUE + Integer.MIN_VALUE + 1,
+                                Long.MAX_VALUE + Integer.MIN_VALUE + 1)));
+    }
+
+    static Stream<Arguments> provideInvalidJobIndexTestCases() {
+        return Stream.of(
+                // differences don't match between lower and upper parts
+                Arguments.of(new JobID(100L, 200L), new JobID(101L, 202L)),
+                // differences out of integer range
+                Arguments.of(
+                        new JobID(100L, 200L),
+                        new JobID(100L + Long.MAX_VALUE, 200L + 
Long.MAX_VALUE)),
+                Arguments.of(
+                        new JobID(-100L, -200L),
+                        new JobID(-100L + Long.MIN_VALUE, -200L + 
Long.MIN_VALUE)));
+    }
+
+    static Stream<Arguments> provideJobIdComparatorTestCases() {
+        return Stream.of(
+                Arguments.of(
+                        0,
+                        new JobID(100L, 200L),
+                        List.of(1, 5, 3, 2, 4, 0),
+                        List.of(0, 1, 2, 3, 4, 5)),
+                Arguments.of(
+                        0,
+                        new JobID(100L, 200L),
+                        List.of(1, 5, -3, -2, -4, 0),
+                        List.of(0, 1, 5, -4, -3, -2)),
+                Arguments.of(
+                        Integer.MAX_VALUE,
+                        new JobID(100L, 200L),
+                        List.of(0, Integer.MAX_VALUE, -2, 1),
+                        List.of(Integer.MAX_VALUE, -2, 0, 1)),
+                Arguments.of(
+                        Integer.MIN_VALUE,
+                        new JobID(100L, 200L),
+                        List.of(0, Integer.MAX_VALUE, 2, -1),
+                        List.of(-1, 0, 2, Integer.MAX_VALUE)));
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideJobIndexTestCases")
+    void testGetJobIndex(int index, JobID baseId, JobID expectedId) {
+        JobID jobId = JobNameBasedJobIdManager.fromJobIndex(index, baseId);
+
+        assertThat(jobId).isEqualTo(expectedId);
+
+        assertThat(JobNameBasedJobIdManager.getJobIndex(jobId, 
baseId)).isEqualTo(index);
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideInvalidJobIndexTestCases")
+    void testGetInvalidJobIndexThrows(JobID baseId, JobID wrongId) {
+        assertThatThrownBy(() -> JobNameBasedJobIdManager.getJobIndex(wrongId, 
baseId))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("is not derived from the base job ID");
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideJobIdComparatorTestCases")
+    void testJobIdComparator(
+            int initialIndex,
+            JobID baseId,
+            List<Integer> indices,
+            List<Integer> expectedSortedIndices) {
+        List<JobID> jobIds =
+                indices.stream()
+                        .map(index -> 
JobNameBasedJobIdManager.fromJobIndex(index, baseId))
+                        .sorted(
+                                JobNameBasedJobIdManager.createJobIdComparator(
+                                        initialIndex, baseId))
+                        .collect(Collectors.toList());
+
+        List<Integer> sortedIndices =
+                jobIds.stream()
+                        .map(jobId -> 
JobNameBasedJobIdManager.getJobIndex(jobId, baseId))
+                        .collect(Collectors.toList());
+
+        assertThat(sortedIndices).isEqualTo(expectedSortedIndices);
+    }
+
+    @Test
+    void testFirstJob() {
+        String jobName = "job1";
+        JobID baseJobId = new JobID(1L, 2L);
+        JobNameBasedJobIdManager manager =
+                new JobNameBasedJobIdManager(baseJobId, 
Collections.emptyList());
+
+        StreamGraph graph = createStreamGraph(jobName);
+        manager.updateJobId(graph);
+
+        assertThat(graph.getJobID()).isEqualTo(baseJobId);
+        assertThat(manager.getFirstJobName()).isEqualTo(jobName);
+        assertThat(manager.getInitialIndex(jobName)).isEqualTo(0);
+        
assertThat(manager.getInitialIndex("some-job")).isEqualTo(Objects.hashCode("some-job"));
+    }
+
+    @Test
+    void testUpdateJobIdWithSameJobName() {
+        String jobName = "job1";
+        JobID baseJobId = new JobID(1L, 2L);
+        JobID jobId2 = new JobID(2L, 3L);
+        JobID jobId3 = new JobID(3L, 4L);
+        JobNameBasedJobIdManager manager =
+                new JobNameBasedJobIdManager(baseJobId, 
Collections.emptyList());
+
+        StreamGraph graph1 = createStreamGraph(jobName);
+        manager.updateJobId(graph1);
+
+        StreamGraph graph2 = createStreamGraph(jobName);
+        manager.updateJobId(graph2);
+
+        StreamGraph graph3 = createStreamGraph(jobName);
+        manager.updateJobId(graph3);
+
+        assertThat(graph1.getJobID()).isEqualTo(baseJobId);
+        assertThat(graph2.getJobID()).isEqualTo(jobId2);
+        assertThat(graph3.getJobID()).isEqualTo(jobId3);
+    }
+
+    @Test
+    void testUpdateJobIdWithRecoveredSameJobName() {
+        String jobName = "job1";
+        JobID baseJobId = new JobID(1L, 2L);
+        JobID jobId2 = new JobID(2L, 3L);
+        JobID jobId3 = new JobID(3L, 4L);
+        JobNameBasedJobIdManager manager =
+                new JobNameBasedJobIdManager(
+                        baseJobId,
+                        List.of(
+                                new JobInfoImpl(jobId2, jobName),
+                                new JobInfoImpl(jobId3, jobName),
+                                new JobInfoImpl(baseJobId, jobName)));
+
+        StreamGraph graph1 = createStreamGraph(jobName);
+        manager.updateJobId(graph1);
+
+        StreamGraph graph2 = createStreamGraph(jobName);
+        manager.updateJobId(graph2);
+
+        StreamGraph graph3 = createStreamGraph(jobName);
+        manager.updateJobId(graph3);
+
+        assertThat(graph1.getJobID()).isEqualTo(baseJobId);
+        assertThat(graph2.getJobID()).isEqualTo(jobId2);
+        assertThat(graph3.getJobID()).isEqualTo(jobId3);
+    }
+
+    @Test
+    void testUpdateJobIdWithDifferentJobNames() {
+        String jobName = "job1";
+        String jobName2 = "job2";
+        String jobName3 = "job3";
+        JobID baseJobId = new JobID(1L, 2L);
+        JobID jobId2 = new JobID(1L + jobName2.hashCode(), 2L + 
jobName2.hashCode());
+        JobID jobId3 = new JobID(1L + jobName3.hashCode(), 2L + 
jobName3.hashCode());
+        JobNameBasedJobIdManager manager =
+                new JobNameBasedJobIdManager(baseJobId, 
Collections.emptyList());
+
+        StreamGraph graph1 = createStreamGraph(jobName);
+        manager.updateJobId(graph1);
+
+        StreamGraph graph3 = createStreamGraph(jobName3);
+        manager.updateJobId(graph3);
+
+        StreamGraph graph2 = createStreamGraph(jobName2);
+        manager.updateJobId(graph2);
+
+        assertThat(graph1.getJobID()).isEqualTo(baseJobId);
+        assertThat(graph2.getJobID()).isEqualTo(jobId2);
+        assertThat(graph3.getJobID()).isEqualTo(jobId3);
+    }
+
+    @Test
+    void testUpdateJobIdWithRecoveredDifferentJobNames() {
+        String jobName = "job1";
+        String jobName2 = "job2";
+        String jobName3 = "job3";
+        JobID baseJobId = new JobID(1L, 2L);
+        JobID jobId2 = new JobID(1L + jobName2.hashCode(), 2L + 
jobName2.hashCode());
+        JobID jobId3 = new JobID(1L + jobName3.hashCode(), 2L + 
jobName3.hashCode());
+        JobNameBasedJobIdManager manager =
+                new JobNameBasedJobIdManager(
+                        baseJobId,
+                        List.of(
+                                new JobInfoImpl(jobId2, jobName2),
+                                new JobInfoImpl(jobId3, jobName3),
+                                new JobInfoImpl(baseJobId, jobName)));
+
+        StreamGraph graph3 = createStreamGraph(jobName3);
+        manager.updateJobId(graph3);
+
+        StreamGraph graph1 = createStreamGraph(jobName);
+        manager.updateJobId(graph1);
+
+        StreamGraph graph2 = createStreamGraph(jobName2);
+        manager.updateJobId(graph2);
+
+        assertThat(graph1.getJobID()).isEqualTo(baseJobId);
+        assertThat(graph2.getJobID()).isEqualTo(jobId2);
+        assertThat(graph3.getJobID()).isEqualTo(jobId3);
+    }
+
+    @Test
+    void testBaseJobIdMissingInRecoveredJobsThrows() {
+        JobID baseJobId = new JobID(100L, 200L);
+        JobID otherJobId = new JobID(300L, 400L);
+
+        Collection<JobInfo> recoveredJobs =
+                Collections.singletonList(new JobInfoImpl(otherJobId, 
"some-job"));
+
+        assertThatThrownBy(() -> new JobNameBasedJobIdManager(baseJobId, 
recoveredJobs))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining(
+                        "Base job ID " + baseJobId + " is not found in the 
recovered jobs.");
+    }
+
+    private StreamGraph createStreamGraph(String jobName) {
+        StreamGraph graph =
+                new StreamGraph(
+                        new Configuration(),
+                        new ExecutionConfig(),
+                        new CheckpointConfig(),
+                        SavepointRestoreSettings.none());
+        graph.setJobName(jobName);
+        return graph;
+    }
+}
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java
index ec8389f4e71..41b0defc530 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java
@@ -42,7 +42,6 @@ import javax.annotation.Nonnull;
 
 import java.nio.file.Path;
 import java.time.Duration;
-import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -102,14 +101,7 @@ public class JarRunApplicationHandler
         ApplicationID applicationId = 
context.getApplicationId().orElse(ApplicationID.generate());
         PackagedProgramApplication application =
                 new PackagedProgramApplication(
-                        applicationId,
-                        program,
-                        Collections.emptyList(),
-                        effectiveConfiguration,
-                        false,
-                        true,
-                        false,
-                        false);
+                        applicationId, program, effectiveConfiguration, false, 
true, false, false);
 
         return gateway.submitApplication(application, timeout)
                 .handle(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
index b385b28da39..3db82f5d5c3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
@@ -56,6 +56,8 @@ public class JobResult implements Serializable {
 
     private final JobID jobId;
 
+    private final String jobName;
+
     /** Stores the job status, null if unknown. */
     @Nullable private final JobStatus jobStatus;
 
@@ -68,6 +70,7 @@ public class JobResult implements Serializable {
 
     private JobResult(
             final JobID jobId,
+            final String jobName,
             @Nullable final JobStatus jobStatus,
             final Map<String, SerializedValue<OptionalFailure<Object>>> 
accumulatorResults,
             final long netRuntime,
@@ -79,6 +82,7 @@ public class JobResult implements Serializable {
                 "jobStatus must be globally terminal or unknow(null)");
 
         this.jobId = requireNonNull(jobId);
+        this.jobName = requireNonNull(jobName);
         this.jobStatus = jobStatus;
         this.accumulatorResults = requireNonNull(accumulatorResults);
         this.netRuntime = netRuntime;
@@ -95,6 +99,10 @@ public class JobResult implements Serializable {
         return jobId;
     }
 
+    public String getJobName() {
+        return jobName;
+    }
+
     public Optional<JobStatus> getJobStatus() {
         return Optional.ofNullable(jobStatus);
     }
@@ -165,6 +173,8 @@ public class JobResult implements Serializable {
 
         private JobID jobId;
 
+        private String jobName = "unknown";
+
         private JobStatus jobStatus;
 
         private Map<String, SerializedValue<OptionalFailure<Object>>> 
accumulatorResults;
@@ -178,6 +188,11 @@ public class JobResult implements Serializable {
             return this;
         }
 
+        public Builder jobName(final String jobName) {
+            this.jobName = jobName;
+            return this;
+        }
+
         public Builder jobStatus(final JobStatus jobStatus) {
             this.jobStatus = jobStatus;
             return this;
@@ -202,6 +217,7 @@ public class JobResult implements Serializable {
         public JobResult build() {
             return new JobResult(
                     jobId,
+                    jobName,
                     jobStatus,
                     accumulatorResults == null ? Collections.emptyMap() : 
accumulatorResults,
                     netRuntime,
@@ -233,6 +249,7 @@ public class JobResult implements Serializable {
 
         final JobResult.Builder builder = new JobResult.Builder();
         builder.jobId(jobId);
+        builder.jobName(accessExecutionGraph.getJobName());
 
         builder.jobStatus(jobStatus.isGloballyTerminalState() ? jobStatus : 
null);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
index 3ebd2aa6590..9a577235d7d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
@@ -71,6 +71,7 @@ public class JobResultDeserializer extends 
StdDeserializer<JobResult> {
     public JobResult deserialize(final JsonParser p, final 
DeserializationContext ctxt)
             throws IOException {
         JobID jobId = null;
+        String jobName = "unknown";
         JobStatus jobStatus = null;
         long netRuntime = -1;
         SerializedThrowable serializedThrowable = null;
@@ -89,6 +90,10 @@ public class JobResultDeserializer extends 
StdDeserializer<JobResult> {
                     assertNextToken(p, JsonToken.VALUE_STRING);
                     jobId = jobIdDeserializer.deserialize(p, ctxt);
                     break;
+                case JobResultSerializer.FIELD_NAME_JOB_NAME:
+                    assertNextToken(p, JsonToken.VALUE_STRING);
+                    jobName = p.getValueAsString();
+                    break;
                 case JobResultSerializer.FIELD_NAME_APPLICATION_STATUS:
                     assertNextToken(p, JsonToken.VALUE_STRING);
                     try {
@@ -119,6 +124,7 @@ public class JobResultDeserializer extends 
StdDeserializer<JobResult> {
         try {
             return new JobResult.Builder()
                     .jobId(jobId)
+                    .jobName(jobName)
                     .jobStatus(jobStatus)
                     .netRuntime(netRuntime)
                     .accumulatorResults(accumulatorResults)
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
index 562e340c249..181394a4eb8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
@@ -45,6 +45,8 @@ public class JobResultSerializer extends 
StdSerializer<JobResult> {
 
     static final String FIELD_NAME_JOB_ID = "id";
 
+    static final String FIELD_NAME_JOB_NAME = "name";
+
     static final String FIELD_NAME_APPLICATION_STATUS = "application-status";
 
     static final String FIELD_NAME_NET_RUNTIME = "net-runtime";
@@ -79,6 +81,9 @@ public class JobResultSerializer extends 
StdSerializer<JobResult> {
         gen.writeFieldName(FIELD_NAME_JOB_ID);
         jobIdSerializer.serialize(result.getJobId(), gen, provider);
 
+        gen.writeFieldName(FIELD_NAME_JOB_NAME);
+        gen.writeString(result.getJobName());
+
         // use application status to maintain backward compatibility
         gen.writeFieldName(FIELD_NAME_APPLICATION_STATUS);
         
gen.writeString(ApplicationStatus.fromJobStatus(result.getJobStatus().orElse(null)).name());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java
index 63606bab2bd..cfe4712efba 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java
@@ -34,6 +34,8 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 
+import static 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.DEFAULT_STREAMING_JOB_NAME;
+
 /**
  * {@code TestingJobResultStore} is a {@link JobResultStore} implementation 
that can be used in
  * tests.
@@ -47,7 +49,12 @@ public class TestingJobResultStore implements JobResultStore 
{
     }
 
     public static JobResult createJobResult(JobID jobId, @Nullable JobStatus 
jobStatus) {
-        return new 
JobResult.Builder().jobId(jobId).jobStatus(jobStatus).netRuntime(1).build();
+        return new JobResult.Builder()
+                .jobId(jobId)
+                .jobName(DEFAULT_STREAMING_JOB_NAME)
+                .jobStatus(jobStatus)
+                .netRuntime(1)
+                .build();
     }
 
     private final Function<JobResultEntry, CompletableFuture<Void>> 
createDirtyResultConsumer;
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java
index 4fe639c6c0d..066831127e0 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java
@@ -85,7 +85,8 @@ public class ScriptExecutorITCase extends 
AbstractSqlGatewayStatementITCaseBase
                     ScriptExecutor.class.getClassLoader(),
                     false,
                     false,
-                    null);
+                    null,
+                    Collections.emptyList());
 
             executor =
                     new TestScriptExecutor(

Reply via email to