This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5dbadfd7f481e9ecdc63ec298af4e53f69786cdf Author: Yi Zhang <[email protected]> AuthorDate: Tue Jan 27 11:06:57 2026 +0800 [FLINK-38973][runtime] Job ID assignment and matching based on name --- .../5b9eed8a-5fb6-4373-98ac-3be2a71941b8 | 2 +- .../java/org/apache/flink/client/ClientUtils.java | 11 +- ...ApplicationDispatcherGatewayServiceFactory.java | 25 +- .../application/PackagedProgramApplication.java | 44 ++- .../application/executors/EmbeddedExecutor.java | 12 +- .../executors/PipelineExecutorUtils.java | 6 - .../apache/flink/client/program/JobIdManager.java | 45 +++ .../client/program/JobNameBasedJobIdManager.java | 254 +++++++++++++++++ .../client/program/StreamContextEnvironment.java | 36 ++- .../PackagedProgramApplicationTest.java | 7 - .../DefaultPackagedProgramRetrieverITCase.java | 4 +- .../program/JobNameBasedJobIdManagerTest.java | 301 +++++++++++++++++++++ .../handlers/JarRunApplicationHandler.java | 10 +- .../apache/flink/runtime/jobmaster/JobResult.java | 17 ++ .../rest/messages/json/JobResultDeserializer.java | 6 + .../rest/messages/json/JobResultSerializer.java | 5 + .../runtime/testutils/TestingJobResultStore.java | 9 +- .../service/application/ScriptExecutorITCase.java | 3 +- 18 files changed, 750 insertions(+), 47 deletions(-) diff --git a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8 b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8 index 256be5208ff..ccf1f8ba490 100644 --- a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8 +++ b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8 @@ -21,7 +21,7 @@ org.apache.flink.cep.pattern.conditions.IterativeCondition.filter(java.lang.Obje org.apache.flink.cep.pattern.conditions.SimpleCondition.filter(java.lang.Object, org.apache.flink.cep.pattern.conditions.IterativeCondition$Context): Argument leaf type org.apache.flink.cep.pattern.conditions.IterativeCondition$Context does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated org.apache.flink.client.program.StreamContextEnvironment.execute(org.apache.flink.streaming.api.graph.StreamGraph): Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated org.apache.flink.client.program.StreamContextEnvironment.executeAsync(org.apache.flink.streaming.api.graph.StreamGraph): Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated -org.apache.flink.client.program.StreamContextEnvironment.setAsContext(org.apache.flink.core.execution.PipelineExecutorServiceLoader, org.apache.flink.configuration.Configuration, java.lang.ClassLoader, boolean, boolean, org.apache.flink.api.common.ApplicationID): Argument leaf type org.apache.flink.core.execution.PipelineExecutorServiceLoader does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated wit [...] +org.apache.flink.client.program.StreamContextEnvironment.setAsContext(org.apache.flink.core.execution.PipelineExecutorServiceLoader, org.apache.flink.configuration.Configuration, java.lang.ClassLoader, boolean, boolean, org.apache.flink.api.common.ApplicationID, java.util.Collection): Argument leaf type org.apache.flink.core.execution.PipelineExecutorServiceLoader does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @P [...] org.apache.flink.client.program.StreamPlanEnvironment.executeAsync(org.apache.flink.streaming.api.graph.StreamGraph): Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated org.apache.flink.client.program.StreamPlanEnvironment.getPipeline(): Returned leaf type org.apache.flink.api.dag.Pipeline does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated org.apache.flink.configuration.ClusterOptions.getSchedulerType(org.apache.flink.configuration.Configuration): Returned leaf type org.apache.flink.configuration.JobManagerOptions$SchedulerType does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java index dd680883bc5..4604b109d28 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java @@ -20,6 +20,7 @@ package org.apache.flink.client; import org.apache.flink.api.common.ApplicationID; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobInfo; import org.apache.flink.api.common.JobStatus; import org.apache.flink.client.cli.ClientOptions; import org.apache.flink.client.program.PackagedProgram; @@ -49,6 +50,7 @@ import java.net.URL; import java.net.URLClassLoader; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.Executors; @@ -89,7 +91,8 @@ public enum ClientUtils { program, enforceSingleJobExecution, suppressSysout, - null); + null, + Collections.emptyList()); } public static void executeProgram( @@ -98,7 +101,8 @@ public enum ClientUtils { PackagedProgram program, boolean enforceSingleJobExecution, boolean suppressSysout, - @Nullable ApplicationID applicationId) + @Nullable ApplicationID applicationId, + Collection<JobInfo> allRecoveredJobInfos) throws ProgramInvocationException { checkNotNull(executorServiceLoader); final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader(); @@ -116,7 +120,8 @@ public enum ClientUtils { userCodeClassLoader, enforceSingleJobExecution, suppressSysout, - applicationId); + applicationId, + allRecoveredJobInfos); // For DataStream v2. ExecutionContextEnvironment.setAsContext( diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java index 6c443c4b649..fa7f597863b 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java @@ -20,7 +20,8 @@ package org.apache.flink.client.deployment.application; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ApplicationID; -import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobInfo; +import org.apache.flink.api.common.JobInfoImpl; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.configuration.ApplicationOptionsInternal; import org.apache.flink.configuration.Configuration; @@ -92,7 +93,9 @@ public class ApplicationDispatcherGatewayServiceFactory ExecutionPlanWriter executionPlanWriter, JobResultStore jobResultStore) { - final List<JobID> recoveredJobIds = getRecoveredJobIds(recoveredJobs); + final List<JobInfo> recoveredJobInfos = getRecoveredJobInfos(recoveredJobs); + final List<JobInfo> recoveredTerminalJobInfos = + getRecoveredTerminalJobInfos(recoveredDirtyJobResults); final boolean allowExecuteMultipleJobs = ApplicationJobUtils.allowExecuteMultipleJobs(configuration); @@ -107,7 +110,8 @@ public class ApplicationDispatcherGatewayServiceFactory new PackagedProgramApplication( applicationId, program, - recoveredJobIds, + recoveredJobInfos, + recoveredTerminalJobInfos, configuration, true, !allowExecuteMultipleJobs, @@ -137,7 +141,18 @@ public class ApplicationDispatcherGatewayServiceFactory return DefaultDispatcherGatewayService.from(dispatcher); } - private List<JobID> getRecoveredJobIds(final Collection<ExecutionPlan> recoveredJobs) { - return recoveredJobs.stream().map(ExecutionPlan::getJobID).collect(Collectors.toList()); + private List<JobInfo> getRecoveredJobInfos(final Collection<ExecutionPlan> recoveredJobs) { + return recoveredJobs.stream() + .map( + executionPlan -> + new JobInfoImpl(executionPlan.getJobID(), executionPlan.getName())) + .collect(Collectors.toList()); + } + + private List<JobInfo> getRecoveredTerminalJobInfos( + final Collection<JobResult> recoveredDirtyJobResults) { + return recoveredDirtyJobResults.stream() + .map(jobResult -> new JobInfoImpl(jobResult.getJobId(), jobResult.getJobName())) + .collect(Collectors.toList()); } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java index 95f54b69de1..a06d1a13854 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ApplicationID; import org.apache.flink.api.common.ApplicationState; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobInfo; import org.apache.flink.api.common.JobStatus; import org.apache.flink.client.ClientUtils; import org.apache.flink.client.cli.ClientOptions; @@ -48,7 +49,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -64,6 +64,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -79,7 +80,9 @@ public class PackagedProgramApplication extends AbstractApplication { private final PackagedProgramDescriptor programDescriptor; - private final Collection<JobID> recoveredJobIds; + private final Collection<JobInfo> recoveredJobInfos; + + private final Collection<JobInfo> recoveredTerminalJobInfos; private final Configuration configuration; @@ -104,7 +107,28 @@ public class PackagedProgramApplication extends AbstractApplication { public PackagedProgramApplication( final ApplicationID applicationId, final PackagedProgram program, - final Collection<JobID> recoveredJobIds, + final Configuration configuration, + final boolean handleFatalError, + final boolean enforceSingleJobExecution, + final boolean submitFailedJobOnApplicationError, + final boolean shutDownOnFinish) { + this( + applicationId, + program, + Collections.emptyList(), + Collections.emptyList(), + configuration, + handleFatalError, + enforceSingleJobExecution, + submitFailedJobOnApplicationError, + shutDownOnFinish); + } + + public PackagedProgramApplication( + final ApplicationID applicationId, + final PackagedProgram program, + final Collection<JobInfo> recoveredJobInfos, + final Collection<JobInfo> recoveredTerminalJobInfos, final Configuration configuration, final boolean handleFatalError, final boolean enforceSingleJobExecution, @@ -112,7 +136,8 @@ public class PackagedProgramApplication extends AbstractApplication { final boolean shutDownOnFinish) { super(applicationId); this.program = checkNotNull(program); - this.recoveredJobIds = checkNotNull(recoveredJobIds); + this.recoveredJobInfos = checkNotNull(recoveredJobInfos); + this.recoveredTerminalJobInfos = checkNotNull(recoveredTerminalJobInfos); this.configuration = checkNotNull(configuration); this.handleFatalError = handleFatalError; this.enforceSingleJobExecution = enforceSingleJobExecution; @@ -527,7 +552,8 @@ public class PackagedProgramApplication extends AbstractApplication { .key()))); return; } - final List<JobID> applicationJobIds = new ArrayList<>(recoveredJobIds); + final List<JobID> applicationJobIds = + recoveredJobInfos.stream().map(JobInfo::getJobId).collect(Collectors.toList()); try { if (program == null) { LOG.info("Reconstructing program from descriptor {}", programDescriptor); @@ -544,7 +570,8 @@ public class PackagedProgramApplication extends AbstractApplication { program, enforceSingleJobExecution, true /* suppress sysout */, - getApplicationId()); + getApplicationId(), + getAllRecoveredJobInfos()); if (applicationJobIds.isEmpty()) { jobIdsFuture.completeExceptionally( @@ -581,6 +608,11 @@ public class PackagedProgramApplication extends AbstractApplication { } } + private Collection<JobInfo> getAllRecoveredJobInfos() { + return Stream.concat(recoveredJobInfos.stream(), recoveredTerminalJobInfos.stream()) + .collect(Collectors.toList()); + } + private CompletableFuture<Void> waitForJobResults( final DispatcherGateway dispatcherGateway, final Collection<JobID> applicationJobIds, diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java index baed45dd649..b2a1db2ad3c 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java @@ -51,6 +51,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * A base class for {@link PipelineExecutor executors} that invoke directly methods of the {@link @@ -110,14 +111,21 @@ public class EmbeddedExecutor implements PipelineExecutor { throws Exception { checkNotNull(pipeline); checkNotNull(configuration); + checkState(pipeline instanceof StreamGraph); + + StreamGraph streamGraph = (StreamGraph) pipeline; final Optional<JobID> optJobId = configuration .getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID) .map(JobID::fromHexString); - if (optJobId.isPresent() && submittedJobIds.contains(optJobId.get())) { - return getJobClientFuture(optJobId.get(), userCodeClassloader); + // Skip resubmission if the job is recovered via HA. + // When optJobId is present, the streamGraph's ID is deterministically derived from it. In + // this case, if the streamGraph's ID is in submittedJobIds, it means the job was submitted + // in a previous run and should not be resubmitted. + if (optJobId.isPresent() && submittedJobIds.contains(streamGraph.getJobID())) { + return getJobClientFuture(streamGraph.getJobID(), userCodeClassloader); } return submitAndGetJobClientFuture(pipeline, configuration, userCodeClassloader); diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java index bdfde6f7cc0..21dbef76c8c 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java @@ -18,7 +18,6 @@ package org.apache.flink.client.deployment.executors; -import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.client.cli.ClientOptions; @@ -26,7 +25,6 @@ import org.apache.flink.client.cli.ExecutionConfigAccessor; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.ExecutionOptions; -import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.core.execution.JobStatusChangedListener; import org.apache.flink.streaming.api.graph.ExecutionPlan; import org.apache.flink.streaming.api.graph.StreamGraph; @@ -89,10 +87,6 @@ public class PipelineExecutorUtils { final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); - configuration - .getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID) - .ifPresent(strJobID -> streamGraph.setJobId(JobID.fromHexString(strJobID))); - if (configuration.get(DeploymentOptions.ATTACHED) && configuration.get(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) { streamGraph.setInitialClientHeartbeatTimeout( diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/JobIdManager.java b/flink-clients/src/main/java/org/apache/flink/client/program/JobIdManager.java new file mode 100644 index 00000000000..0f472913610 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/program/JobIdManager.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program; + +import org.apache.flink.streaming.api.graph.StreamGraph; + +/** + * Interface for managing the job IDs. + * + * <p>This interface allows custom logic to update the job ID of a {@link StreamGraph} before + * submitting the job. + */ +public interface JobIdManager { + + /** + * Updates the job ID of the given {@link StreamGraph}. + * + * @param streamGraph The {@link StreamGraph} to update. + */ + void updateJobId(StreamGraph streamGraph); + + JobIdManager NO_OP = + new JobIdManager() { + @Override + public void updateJobId(StreamGraph streamGraph) { + // no-op + } + }; +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/JobNameBasedJobIdManager.java b/flink-clients/src/main/java/org/apache/flink/client/program/JobNameBasedJobIdManager.java new file mode 100644 index 00000000000..4259844a059 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/program/JobNameBasedJobIdManager.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobInfo; +import org.apache.flink.streaming.api.graph.StreamGraph; + +import javax.annotation.Nullable; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link JobIdManager} implementation that assigns job IDs based on job names, supporting + * recovery from previously submitted jobs and deterministic ID assignment. + * + * <p>This manager ensures that: + * + * <ul> + * <li><b>Compatibility</b>: The first submitted job retains the original fixed {@code baseJobId}, + * ensuring full backward compatibility with existing behavior. + * <li><b>Uniqueness</b>: Guarantees globally unique job IDs across all submitted jobs. + * <li><b>Correct Association</b>: Matches recovered jobs strictly by job name to prevent + * cross-name misassociation; for jobs with the same name, associates them in strict + * submission order. + * </ul> + */ +public class JobNameBasedJobIdManager implements JobIdManager { + + private final JobID baseJobId; + + private final Map<String, ArrayDeque<JobID>> recoveredJobNameToJobIds = new HashMap<>(); + + private final Set<JobID> occupiedJobIds = new HashSet<>(); + + private @Nullable String firstJobName; + + /** + * Creates a new job ID manager with the given base job ID and recovered job information. + * + * @param baseJobId the base job ID used as the origin for deriving all job IDs + * @param allRecoveredJobInfos a collection of recovered job information; if non-empty, it + * <b>must contain</b> a job with ID equal to {@code baseJobId} + */ + public JobNameBasedJobIdManager(JobID baseJobId, Collection<JobInfo> allRecoveredJobInfos) { + this.baseJobId = checkNotNull(baseJobId); + checkNotNull(allRecoveredJobInfos); + + if (!allRecoveredJobInfos.isEmpty()) { + // step 1: find the first job name + Optional<JobInfo> optionalBaseJobInfo = + allRecoveredJobInfos.stream() + .filter(jobInfo -> jobInfo.getJobId().equals(baseJobId)) + .findFirst(); + + if (optionalBaseJobInfo.isEmpty()) { + throw new IllegalArgumentException( + String.format( + "Base job ID %s is not found in the recovered jobs.", baseJobId)); + } + + this.firstJobName = optionalBaseJobInfo.get().getJobName(); + + // step 2: group jobs by job name and sort then according to the job index + Map<String, List<JobID>> grouped = + allRecoveredJobInfos.stream() + .collect( + Collectors.groupingBy( + JobInfo::getJobName, + Collectors.mapping( + JobInfo::getJobId, Collectors.toList()))); + + for (Map.Entry<String, List<JobID>> entry : grouped.entrySet()) { + String jobName = entry.getKey(); + List<JobID> jobIds = entry.getValue(); + + int initialIndex = getInitialIndex(jobName); + + jobIds.sort(createJobIdComparator(initialIndex, baseJobId)); + + this.recoveredJobNameToJobIds.put(jobName, new ArrayDeque<>(jobIds)); + } + + // step 3: initialize occupied job ids + allRecoveredJobInfos.forEach(jobInfo -> this.occupiedJobIds.add(jobInfo.getJobId())); + } + } + + /** + * Assigns a job ID to the given {@link StreamGraph} based on its job name and the current state + * of recovered and occupied job IDs. + * + * <p>Assignment follows the logic: + * + * <ul> + * <li>First, attempts to reuse a recovered job ID for the same job name (in assignment + * order). + * <li>If no recovered job is available, assigns a new job ID by probing from the name-derived + * starting index until an unused ID is found. + * </ul> + * + * @param streamGraph the stream graph to assign a job ID to + */ + @Override + public void updateJobId(StreamGraph streamGraph) { + String jobName = checkNotNull(streamGraph.getJobName()); + + // case 1: try to find a job id from the recovered jobs + ArrayDeque<JobID> recoveredJobIds = recoveredJobNameToJobIds.get(jobName); + if (recoveredJobIds != null) { + JobID jobId = recoveredJobIds.pollFirst(); + if (recoveredJobIds.isEmpty()) { + recoveredJobNameToJobIds.remove(jobName); + } + streamGraph.setJobId(jobId); + return; + } + + // case 2: generate a new job id + final JobID assignedJobId; + if (firstJobName == null) { + firstJobName = jobName; + assignedJobId = baseJobId; + } else { + int initialIndex = getInitialIndex(jobName); + int step = 0; + while (true) { + // the calculated index may overflow and wrap around + int candidateIndex = initialIndex + step; + JobID candidateId = fromJobIndex(candidateIndex, baseJobId); + if (!occupiedJobIds.contains(candidateId)) { + assignedJobId = candidateId; + break; + } + + // the step may overflow and wrap around + step++; + if (step == 0) { + throw new IllegalStateException( + String.format("Exhausted all job IDs for job name: %s", jobName)); + } + } + } + streamGraph.setJobId(assignedJobId); + occupiedJobIds.add(assignedJobId); + } + + @VisibleForTesting + String getFirstJobName() { + return firstJobName; + } + + @VisibleForTesting + int getInitialIndex(String jobName) { + checkNotNull(firstJobName); + + return jobName.equals(firstJobName) ? 0 : jobName.hashCode(); + } + + /** + * Creates a new job ID by adding the given index to both parts of the base job ID. + * + * <p>Note: Both the lower part and upper part may overflow and wrap around. + * + * @param index the index to add to the base job ID parts + * @param baseId the base job ID to derive from + * @return a new job ID with index applied to both parts + */ + @VisibleForTesting + static JobID fromJobIndex(int index, JobID baseId) { + return new JobID(baseId.getLowerPart() + index, baseId.getUpperPart() + index); + } + + /** + * Computes the index of the given job ID relative to the base job ID. + * + * <p>This method correctly calculates the index even when the lower and upper parts of the job + * ID have wrapped around due to overflow. + * + * @param jobId the job ID to derive the index from + * @param baseId the base job ID + * @return the index of the given job ID relative to the base job ID + */ + @VisibleForTesting + static int getJobIndex(JobID jobId, JobID baseId) { + long lowerDiff = jobId.getLowerPart() - baseId.getLowerPart(); + long upperDiff = jobId.getUpperPart() - baseId.getUpperPart(); + + if (lowerDiff != upperDiff + || lowerDiff < Integer.MIN_VALUE + || lowerDiff > Integer.MAX_VALUE) { + throw new IllegalArgumentException( + String.format( + "Job ID %s is not derived from the base job ID %s.", jobId, baseId)); + } + + return (int) lowerDiff; + } + + /** + * Creates a comparator for sorting job IDs based on their logical position relative to an + * initial index, supporting circular sequence number semantics. + * + * <p>This comparator handles the case where job indices may have wrapped around due to overflow + * by treating the difference as an unsigned 32-bit value. This allows proper ordering of job + * IDs even when their indices span across the boundary where signed integers would overflow. + * + * @param initialIndex the starting index used as reference point for calculating logical + * offsets + * @return a comparator that sorts job IDs by their calculated unsigned offset from the initial + * index + */ + @VisibleForTesting + static Comparator<JobID> createJobIdComparator(int initialIndex, JobID baseId) { + return (id1, id2) -> { + int idx1 = getJobIndex(id1, baseId); + int idx2 = getJobIndex(id2, baseId); + + // calculate logical offset from initialIndex using unsigned 32-bit subtraction + // to support circular sequence numbers. + long steps1 = (idx1 - initialIndex) & 0xFFFFFFFFL; + long steps2 = (idx2 - initialIndex) & 0xFFFFFFFFL; + return Long.compare(steps1, steps2); + }; + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java index 2799ab63cb5..d45a49d52c3 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java @@ -21,10 +21,13 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ApplicationID; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobInfo; import org.apache.flink.client.ClientUtils; import org.apache.flink.client.cli.ClientOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.core.execution.DetachedJobExecutionResult; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.JobListener; @@ -49,6 +52,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -77,6 +81,8 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { @Nullable private final ApplicationID applicationId; + private final JobIdManager jobIdManager; + public StreamContextEnvironment( final PipelineExecutorServiceLoader executorServiceLoader, final Configuration configuration, @@ -112,7 +118,8 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { suppressSysout, programConfigEnabled, programConfigWildcards, - null); + null, + Collections.emptyList()); } @Internal @@ -125,7 +132,8 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { final boolean suppressSysout, final boolean programConfigEnabled, final Collection<String> programConfigWildcards, - @Nullable final ApplicationID applicationId) { + @Nullable final ApplicationID applicationId, + final Collection<JobInfo> allRecoveredJobInfos) { super(executorServiceLoader, configuration, userCodeClassLoader); this.suppressSysout = suppressSysout; this.enforceSingleJobExecution = enforceSingleJobExecution; @@ -134,6 +142,23 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { this.programConfigEnabled = programConfigEnabled; this.programConfigWildcards = programConfigWildcards; this.applicationId = applicationId; + this.jobIdManager = createJobIdManager(configuration, allRecoveredJobInfos); + } + + private JobIdManager createJobIdManager( + Configuration configuration, Collection<JobInfo> allRecoveredJobInfos) { + Optional<String> optionalBaseJobId = + configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID); + // If optionalBaseJobId is empty, it means the user does not explicitly specify a job ID + // and HA mode is not enabled. In this case, deterministic job IDs are unnecessary since + // the user is not expecting a predefined job ID and no job matching is required for HA + // recovery. So we can use the NO_OP JobIdManager and keep the randomly generated job IDs. + if (optionalBaseJobId.isEmpty()) { + return JobIdManager.NO_OP; + } + + JobID baseJobId = JobID.fromHexString(optionalBaseJobId.get()); + return new JobNameBasedJobIdManager(baseJobId, allRecoveredJobInfos); } @Override @@ -216,6 +241,7 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { if (applicationId != null) { streamGraph.setApplicationId(applicationId); } + jobIdManager.updateJobId(streamGraph); final JobClient jobClient = super.executeAsync(streamGraph); if (!suppressSysout) { @@ -241,7 +267,8 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { final ClassLoader userCodeClassLoader, final boolean enforceSingleJobExecution, final boolean suppressSysout, - @Nullable final ApplicationID applicationId) { + @Nullable final ApplicationID applicationId, + Collection<JobInfo> allRecoveredJobInfos) { final StreamExecutionEnvironmentFactory factory = envInitConfig -> { final boolean programConfigEnabled = @@ -260,7 +287,8 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { suppressSysout, programConfigEnabled, programConfigWildcards, - applicationId); + applicationId, + allRecoveredJobInfos); }; initializeContextEnvironment(factory); } diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java index 12affab4747..2ae9882b475 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java @@ -57,7 +57,6 @@ import org.junit.jupiter.params.provider.EnumSource; import javax.annotation.Nullable; -import java.util.Collections; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedDeque; @@ -176,7 +175,6 @@ public class PackagedProgramApplicationTest { new PackagedProgramApplication( new ApplicationID(), getProgram(2), - Collections.emptyList(), getConfiguration(), true, false, @@ -269,7 +267,6 @@ public class PackagedProgramApplicationTest { new PackagedProgramApplication( new ApplicationID(), getProgram(2), - Collections.emptyList(), getConfiguration(), true, false, @@ -481,7 +478,6 @@ public class PackagedProgramApplicationTest { new PackagedProgramApplication( new ApplicationID(), getProgram(1), - Collections.emptyList(), getConfiguration(), true, false, @@ -1081,7 +1077,6 @@ public class PackagedProgramApplicationTest { new PackagedProgramApplication( new ApplicationID(), FailingJob.getProgram(), - Collections.emptyList(), configuration, true, true /* enforceSingleJobExecution */, @@ -1100,7 +1095,6 @@ public class PackagedProgramApplicationTest { new PackagedProgramApplication( new ApplicationID(), FailingJob.getProgram(), - Collections.emptyList(), getConfiguration(), true, false /* enforceSingleJobExecution */, @@ -1234,7 +1228,6 @@ public class PackagedProgramApplicationTest { new PackagedProgramApplication( new ApplicationID(), program, - Collections.emptyList(), configuration, handleFatalError, enforceSingleJobExecution, diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java index d8fc7898a3b..61d144c0060 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java @@ -217,7 +217,7 @@ class DefaultPackagedProgramRetrieverITCase { assertThat(streamGraph.getSavepointRestoreSettings()) .isEqualTo(SavepointRestoreSettings.none()); assertThat(streamGraph.getMaximumParallelism()).isEqualTo(parallelism); - assertThat(streamGraph.getJobID()).isEqualTo(jobId); + // we don't check the job id because StreamPlanEnvironment does not support fixed job id } @Test @@ -281,7 +281,7 @@ class DefaultPackagedProgramRetrieverITCase { final StreamGraph streamGraph = retrieveStreamGraph(retrieverUnderTest, configuration); assertThat(streamGraph.getSavepointRestoreSettings()).isEqualTo(savepointRestoreSettings); - assertThat(streamGraph.getJobID()).isEqualTo(jobId); + // we don't check the job id because StreamPlanEnvironment does not support fixed job id } @Test diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/JobNameBasedJobIdManagerTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/JobNameBasedJobIdManagerTest.java new file mode 100644 index 00000000000..ff6ff29cd2e --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/program/JobNameBasedJobIdManagerTest.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobInfo; +import org.apache.flink.api.common.JobInfoImpl; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.graph.StreamGraph; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link JobNameBasedJobIdManager}. */ +class JobNameBasedJobIdManagerTest { + + static Stream<Arguments> provideJobIndexTestCases() { + return Stream.of( + // simple cases + Arguments.of(0, new JobID(100L, 200L), new JobID(100L, 200L)), + Arguments.of(1, new JobID(100L, 200L), new JobID(101L, 201L)), + Arguments.of(-1, new JobID(100L, 200L), new JobID(99L, 199L)), + // overflow + Arguments.of( + 1, + new JobID(Long.MAX_VALUE, Long.MAX_VALUE), + new JobID(Long.MIN_VALUE, Long.MIN_VALUE)), + Arguments.of( + -1, + new JobID(Long.MIN_VALUE, Long.MIN_VALUE), + new JobID(Long.MAX_VALUE, Long.MAX_VALUE)), + Arguments.of( + Integer.MAX_VALUE, + new JobID(Long.MAX_VALUE, Long.MAX_VALUE), + new JobID( + Long.MIN_VALUE + Integer.MAX_VALUE - 1, + Long.MIN_VALUE + Integer.MAX_VALUE - 1)), + Arguments.of( + Integer.MIN_VALUE, + new JobID(Long.MIN_VALUE, Long.MIN_VALUE), + new JobID( + Long.MAX_VALUE + Integer.MIN_VALUE + 1, + Long.MAX_VALUE + Integer.MIN_VALUE + 1))); + } + + static Stream<Arguments> provideInvalidJobIndexTestCases() { + return Stream.of( + // differences don't match between lower and upper parts + Arguments.of(new JobID(100L, 200L), new JobID(101L, 202L)), + // differences out of integer range + Arguments.of( + new JobID(100L, 200L), + new JobID(100L + Long.MAX_VALUE, 200L + Long.MAX_VALUE)), + Arguments.of( + new JobID(-100L, -200L), + new JobID(-100L + Long.MIN_VALUE, -200L + Long.MIN_VALUE))); + } + + static Stream<Arguments> provideJobIdComparatorTestCases() { + return Stream.of( + Arguments.of( + 0, + new JobID(100L, 200L), + List.of(1, 5, 3, 2, 4, 0), + List.of(0, 1, 2, 3, 4, 5)), + Arguments.of( + 0, + new JobID(100L, 200L), + List.of(1, 5, -3, -2, -4, 0), + List.of(0, 1, 5, -4, -3, -2)), + Arguments.of( + Integer.MAX_VALUE, + new JobID(100L, 200L), + List.of(0, Integer.MAX_VALUE, -2, 1), + List.of(Integer.MAX_VALUE, -2, 0, 1)), + Arguments.of( + Integer.MIN_VALUE, + new JobID(100L, 200L), + List.of(0, Integer.MAX_VALUE, 2, -1), + List.of(-1, 0, 2, Integer.MAX_VALUE))); + } + + @ParameterizedTest + @MethodSource("provideJobIndexTestCases") + void testGetJobIndex(int index, JobID baseId, JobID expectedId) { + JobID jobId = JobNameBasedJobIdManager.fromJobIndex(index, baseId); + + assertThat(jobId).isEqualTo(expectedId); + + assertThat(JobNameBasedJobIdManager.getJobIndex(jobId, baseId)).isEqualTo(index); + } + + @ParameterizedTest + @MethodSource("provideInvalidJobIndexTestCases") + void testGetInvalidJobIndexThrows(JobID baseId, JobID wrongId) { + assertThatThrownBy(() -> JobNameBasedJobIdManager.getJobIndex(wrongId, baseId)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("is not derived from the base job ID"); + } + + @ParameterizedTest + @MethodSource("provideJobIdComparatorTestCases") + void testJobIdComparator( + int initialIndex, + JobID baseId, + List<Integer> indices, + List<Integer> expectedSortedIndices) { + List<JobID> jobIds = + indices.stream() + .map(index -> JobNameBasedJobIdManager.fromJobIndex(index, baseId)) + .sorted( + JobNameBasedJobIdManager.createJobIdComparator( + initialIndex, baseId)) + .collect(Collectors.toList()); + + List<Integer> sortedIndices = + jobIds.stream() + .map(jobId -> JobNameBasedJobIdManager.getJobIndex(jobId, baseId)) + .collect(Collectors.toList()); + + assertThat(sortedIndices).isEqualTo(expectedSortedIndices); + } + + @Test + void testFirstJob() { + String jobName = "job1"; + JobID baseJobId = new JobID(1L, 2L); + JobNameBasedJobIdManager manager = + new JobNameBasedJobIdManager(baseJobId, Collections.emptyList()); + + StreamGraph graph = createStreamGraph(jobName); + manager.updateJobId(graph); + + assertThat(graph.getJobID()).isEqualTo(baseJobId); + assertThat(manager.getFirstJobName()).isEqualTo(jobName); + assertThat(manager.getInitialIndex(jobName)).isEqualTo(0); + assertThat(manager.getInitialIndex("some-job")).isEqualTo(Objects.hashCode("some-job")); + } + + @Test + void testUpdateJobIdWithSameJobName() { + String jobName = "job1"; + JobID baseJobId = new JobID(1L, 2L); + JobID jobId2 = new JobID(2L, 3L); + JobID jobId3 = new JobID(3L, 4L); + JobNameBasedJobIdManager manager = + new JobNameBasedJobIdManager(baseJobId, Collections.emptyList()); + + StreamGraph graph1 = createStreamGraph(jobName); + manager.updateJobId(graph1); + + StreamGraph graph2 = createStreamGraph(jobName); + manager.updateJobId(graph2); + + StreamGraph graph3 = createStreamGraph(jobName); + manager.updateJobId(graph3); + + assertThat(graph1.getJobID()).isEqualTo(baseJobId); + assertThat(graph2.getJobID()).isEqualTo(jobId2); + assertThat(graph3.getJobID()).isEqualTo(jobId3); + } + + @Test + void testUpdateJobIdWithRecoveredSameJobName() { + String jobName = "job1"; + JobID baseJobId = new JobID(1L, 2L); + JobID jobId2 = new JobID(2L, 3L); + JobID jobId3 = new JobID(3L, 4L); + JobNameBasedJobIdManager manager = + new JobNameBasedJobIdManager( + baseJobId, + List.of( + new JobInfoImpl(jobId2, jobName), + new JobInfoImpl(jobId3, jobName), + new JobInfoImpl(baseJobId, jobName))); + + StreamGraph graph1 = createStreamGraph(jobName); + manager.updateJobId(graph1); + + StreamGraph graph2 = createStreamGraph(jobName); + manager.updateJobId(graph2); + + StreamGraph graph3 = createStreamGraph(jobName); + manager.updateJobId(graph3); + + assertThat(graph1.getJobID()).isEqualTo(baseJobId); + assertThat(graph2.getJobID()).isEqualTo(jobId2); + assertThat(graph3.getJobID()).isEqualTo(jobId3); + } + + @Test + void testUpdateJobIdWithDifferentJobNames() { + String jobName = "job1"; + String jobName2 = "job2"; + String jobName3 = "job3"; + JobID baseJobId = new JobID(1L, 2L); + JobID jobId2 = new JobID(1L + jobName2.hashCode(), 2L + jobName2.hashCode()); + JobID jobId3 = new JobID(1L + jobName3.hashCode(), 2L + jobName3.hashCode()); + JobNameBasedJobIdManager manager = + new JobNameBasedJobIdManager(baseJobId, Collections.emptyList()); + + StreamGraph graph1 = createStreamGraph(jobName); + manager.updateJobId(graph1); + + StreamGraph graph3 = createStreamGraph(jobName3); + manager.updateJobId(graph3); + + StreamGraph graph2 = createStreamGraph(jobName2); + manager.updateJobId(graph2); + + assertThat(graph1.getJobID()).isEqualTo(baseJobId); + assertThat(graph2.getJobID()).isEqualTo(jobId2); + assertThat(graph3.getJobID()).isEqualTo(jobId3); + } + + @Test + void testUpdateJobIdWithRecoveredDifferentJobNames() { + String jobName = "job1"; + String jobName2 = "job2"; + String jobName3 = "job3"; + JobID baseJobId = new JobID(1L, 2L); + JobID jobId2 = new JobID(1L + jobName2.hashCode(), 2L + jobName2.hashCode()); + JobID jobId3 = new JobID(1L + jobName3.hashCode(), 2L + jobName3.hashCode()); + JobNameBasedJobIdManager manager = + new JobNameBasedJobIdManager( + baseJobId, + List.of( + new JobInfoImpl(jobId2, jobName2), + new JobInfoImpl(jobId3, jobName3), + new JobInfoImpl(baseJobId, jobName))); + + StreamGraph graph3 = createStreamGraph(jobName3); + manager.updateJobId(graph3); + + StreamGraph graph1 = createStreamGraph(jobName); + manager.updateJobId(graph1); + + StreamGraph graph2 = createStreamGraph(jobName2); + manager.updateJobId(graph2); + + assertThat(graph1.getJobID()).isEqualTo(baseJobId); + assertThat(graph2.getJobID()).isEqualTo(jobId2); + assertThat(graph3.getJobID()).isEqualTo(jobId3); + } + + @Test + void testBaseJobIdMissingInRecoveredJobsThrows() { + JobID baseJobId = new JobID(100L, 200L); + JobID otherJobId = new JobID(300L, 400L); + + Collection<JobInfo> recoveredJobs = + Collections.singletonList(new JobInfoImpl(otherJobId, "some-job")); + + assertThatThrownBy(() -> new JobNameBasedJobIdManager(baseJobId, recoveredJobs)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Base job ID " + baseJobId + " is not found in the recovered jobs."); + } + + private StreamGraph createStreamGraph(String jobName) { + StreamGraph graph = + new StreamGraph( + new Configuration(), + new ExecutionConfig(), + new CheckpointConfig(), + SavepointRestoreSettings.none()); + graph.setJobName(jobName); + return graph; + } +} diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java index ec8389f4e71..41b0defc530 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java @@ -42,7 +42,6 @@ import javax.annotation.Nonnull; import java.nio.file.Path; import java.time.Duration; -import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -102,14 +101,7 @@ public class JarRunApplicationHandler ApplicationID applicationId = context.getApplicationId().orElse(ApplicationID.generate()); PackagedProgramApplication application = new PackagedProgramApplication( - applicationId, - program, - Collections.emptyList(), - effectiveConfiguration, - false, - true, - false, - false); + applicationId, program, effectiveConfiguration, false, true, false, false); return gateway.submitApplication(application, timeout) .handle( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java index b385b28da39..3db82f5d5c3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java @@ -56,6 +56,8 @@ public class JobResult implements Serializable { private final JobID jobId; + private final String jobName; + /** Stores the job status, null if unknown. */ @Nullable private final JobStatus jobStatus; @@ -68,6 +70,7 @@ public class JobResult implements Serializable { private JobResult( final JobID jobId, + final String jobName, @Nullable final JobStatus jobStatus, final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults, final long netRuntime, @@ -79,6 +82,7 @@ public class JobResult implements Serializable { "jobStatus must be globally terminal or unknow(null)"); this.jobId = requireNonNull(jobId); + this.jobName = requireNonNull(jobName); this.jobStatus = jobStatus; this.accumulatorResults = requireNonNull(accumulatorResults); this.netRuntime = netRuntime; @@ -95,6 +99,10 @@ public class JobResult implements Serializable { return jobId; } + public String getJobName() { + return jobName; + } + public Optional<JobStatus> getJobStatus() { return Optional.ofNullable(jobStatus); } @@ -165,6 +173,8 @@ public class JobResult implements Serializable { private JobID jobId; + private String jobName = "unknown"; + private JobStatus jobStatus; private Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults; @@ -178,6 +188,11 @@ public class JobResult implements Serializable { return this; } + public Builder jobName(final String jobName) { + this.jobName = jobName; + return this; + } + public Builder jobStatus(final JobStatus jobStatus) { this.jobStatus = jobStatus; return this; @@ -202,6 +217,7 @@ public class JobResult implements Serializable { public JobResult build() { return new JobResult( jobId, + jobName, jobStatus, accumulatorResults == null ? Collections.emptyMap() : accumulatorResults, netRuntime, @@ -233,6 +249,7 @@ public class JobResult implements Serializable { final JobResult.Builder builder = new JobResult.Builder(); builder.jobId(jobId); + builder.jobName(accessExecutionGraph.getJobName()); builder.jobStatus(jobStatus.isGloballyTerminalState() ? jobStatus : null); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java index 3ebd2aa6590..9a577235d7d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java @@ -71,6 +71,7 @@ public class JobResultDeserializer extends StdDeserializer<JobResult> { public JobResult deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException { JobID jobId = null; + String jobName = "unknown"; JobStatus jobStatus = null; long netRuntime = -1; SerializedThrowable serializedThrowable = null; @@ -89,6 +90,10 @@ public class JobResultDeserializer extends StdDeserializer<JobResult> { assertNextToken(p, JsonToken.VALUE_STRING); jobId = jobIdDeserializer.deserialize(p, ctxt); break; + case JobResultSerializer.FIELD_NAME_JOB_NAME: + assertNextToken(p, JsonToken.VALUE_STRING); + jobName = p.getValueAsString(); + break; case JobResultSerializer.FIELD_NAME_APPLICATION_STATUS: assertNextToken(p, JsonToken.VALUE_STRING); try { @@ -119,6 +124,7 @@ public class JobResultDeserializer extends StdDeserializer<JobResult> { try { return new JobResult.Builder() .jobId(jobId) + .jobName(jobName) .jobStatus(jobStatus) .netRuntime(netRuntime) .accumulatorResults(accumulatorResults) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java index 562e340c249..181394a4eb8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java @@ -45,6 +45,8 @@ public class JobResultSerializer extends StdSerializer<JobResult> { static final String FIELD_NAME_JOB_ID = "id"; + static final String FIELD_NAME_JOB_NAME = "name"; + static final String FIELD_NAME_APPLICATION_STATUS = "application-status"; static final String FIELD_NAME_NET_RUNTIME = "net-runtime"; @@ -79,6 +81,9 @@ public class JobResultSerializer extends StdSerializer<JobResult> { gen.writeFieldName(FIELD_NAME_JOB_ID); jobIdSerializer.serialize(result.getJobId(), gen, provider); + gen.writeFieldName(FIELD_NAME_JOB_NAME); + gen.writeString(result.getJobName()); + // use application status to maintain backward compatibility gen.writeFieldName(FIELD_NAME_APPLICATION_STATUS); gen.writeString(ApplicationStatus.fromJobStatus(result.getJobStatus().orElse(null)).name()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java index 63606bab2bd..cfe4712efba 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java @@ -34,6 +34,8 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Function; +import static org.apache.flink.streaming.api.graph.StreamGraphGenerator.DEFAULT_STREAMING_JOB_NAME; + /** * {@code TestingJobResultStore} is a {@link JobResultStore} implementation that can be used in * tests. @@ -47,7 +49,12 @@ public class TestingJobResultStore implements JobResultStore { } public static JobResult createJobResult(JobID jobId, @Nullable JobStatus jobStatus) { - return new JobResult.Builder().jobId(jobId).jobStatus(jobStatus).netRuntime(1).build(); + return new JobResult.Builder() + .jobId(jobId) + .jobName(DEFAULT_STREAMING_JOB_NAME) + .jobStatus(jobStatus) + .netRuntime(1) + .build(); } private final Function<JobResultEntry, CompletableFuture<Void>> createDirtyResultConsumer; diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java index 4fe639c6c0d..066831127e0 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptExecutorITCase.java @@ -85,7 +85,8 @@ public class ScriptExecutorITCase extends AbstractSqlGatewayStatementITCaseBase ScriptExecutor.class.getClassLoader(), false, false, - null); + null, + Collections.emptyList()); executor = new TestScriptExecutor(
