This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a70ca9c029b [FLINK-37711] Remove unused getJobGraph from
PipelineExecutorUtils
a70ca9c029b is described below
commit a70ca9c029bae9f1f4f11b989d257d8d1dc9408d
Author: Jiaan Geng <[email protected]>
AuthorDate: Thu Sep 25 04:46:22 2025 +0800
[FLINK-37711] Remove unused getJobGraph from PipelineExecutorUtils
---
.../executors/PipelineExecutorUtils.java | 47 -------
.../DefaultPackagedProgramRetrieverITCase.java | 148 ++++++++++-----------
2 files changed, 72 insertions(+), 123 deletions(-)
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
index ae8706a593d..bdfde6f7cc0 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
@@ -21,7 +21,6 @@ package org.apache.flink.client.deployment.executors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.cli.ExecutionConfigAccessor;
import org.apache.flink.configuration.Configuration;
@@ -29,7 +28,6 @@ import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.core.execution.JobStatusChangedListener;
-import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent;
@@ -39,7 +37,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
-import java.net.MalformedURLException;
import java.util.List;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -49,50 +46,6 @@ import static org.apache.flink.util.Preconditions.checkState;
public class PipelineExecutorUtils {
private static final Logger LOG =
LoggerFactory.getLogger(PipelineExecutorUtils.class);
- /**
- * Creates the {@link JobGraph} corresponding to the provided {@link
Pipeline}.
- *
- * @param pipeline the pipeline whose job graph we are computing.
- * @param configuration the configuration with the necessary information
such as jars and
- * classpaths to be included, the parallelism of the job and potential
savepoint settings
- * used to bootstrap its state.
- * @param userClassloader the classloader which can load user classes.
- * @return the corresponding {@link JobGraph}.
- */
- public static JobGraph getJobGraph(
- @Nonnull final Pipeline pipeline,
- @Nonnull final Configuration configuration,
- @Nonnull ClassLoader userClassloader)
- throws MalformedURLException {
- checkNotNull(pipeline);
- checkNotNull(configuration);
-
- final ExecutionConfigAccessor executionConfigAccessor =
- ExecutionConfigAccessor.fromConfiguration(configuration);
- final JobGraph jobGraph =
- FlinkPipelineTranslationUtil.getJobGraph(
- userClassloader,
- pipeline,
- configuration,
- executionConfigAccessor.getParallelism());
-
- configuration
- .getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)
- .ifPresent(strJobID ->
jobGraph.setJobID(JobID.fromHexString(strJobID)));
-
- if (configuration.get(DeploymentOptions.ATTACHED)
- && configuration.get(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
- jobGraph.setInitialClientHeartbeatTimeout(
-
configuration.get(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT).toMillis());
- }
-
- jobGraph.addJars(executionConfigAccessor.getJars());
- jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
-
jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
-
- return jobGraph;
- }
-
/**
* Notify the {@link DefaultJobCreatedEvent} to job status changed
listeners.
*
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java
b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java
index 60786e2e41f..d8fc7898a3b 100644
---
a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java
+++
b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java
@@ -28,8 +28,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
-import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.ChildFirstClassLoader;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkException;
@@ -191,7 +191,7 @@ class DefaultPackagedProgramRetrieverITCase {
}
@Test
- void testJobGraphRetrieval() throws IOException, FlinkException,
ProgramInvocationException {
+ void testJobGraphRetrieval() throws Exception {
final int parallelism = 42;
final JobID jobId = new JobID();
@@ -207,22 +207,21 @@ class DefaultPackagedProgramRetrieverITCase {
ClasspathProviderExtension.parametersForTestJob(expectedSuffix),
new Configuration());
- final JobGraph jobGraph = retrieveJobGraph(retriever, configuration);
+ final StreamGraph streamGraph = retrieveStreamGraph(retriever,
configuration);
- assertThat(jobGraph.getName())
+ assertThat(streamGraph.getName())
.isEqualTo(
testJobEntryClassClasspathProvider.getJobClassName()
+ "-"
+ expectedSuffix);
- assertThat(jobGraph.getSavepointRestoreSettings())
+ assertThat(streamGraph.getSavepointRestoreSettings())
.isEqualTo(SavepointRestoreSettings.none());
- assertThat(jobGraph.getMaximumParallelism()).isEqualTo(parallelism);
- assertThat(jobGraph.getJobID()).isEqualTo(jobId);
+ assertThat(streamGraph.getMaximumParallelism()).isEqualTo(parallelism);
+ assertThat(streamGraph.getJobID()).isEqualTo(jobId);
}
@Test
- void testJobGraphRetrievalFromJar()
- throws IOException, FlinkException, ProgramInvocationException {
+ void testJobGraphRetrievalFromJar() throws Exception {
final String expectedSuffix = "suffix";
final PackagedProgramRetriever retrieverUnderTest =
DefaultPackagedProgramRetriever.create(
@@ -232,9 +231,10 @@ class DefaultPackagedProgramRetrieverITCase {
ClasspathProviderExtension.parametersForTestJob(expectedSuffix),
new Configuration());
- final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new
Configuration());
+ final StreamGraph streamGraph =
+ retrieveStreamGraph(retrieverUnderTest, new Configuration());
- assertThat(jobGraph.getName())
+ assertThat(streamGraph.getName())
.isEqualTo(
testJobEntryClassClasspathProvider.getJobClassName()
+ "-"
@@ -242,8 +242,7 @@ class DefaultPackagedProgramRetrieverITCase {
}
@Test
- void testParameterConsiderationForMultipleJobsOnSystemClasspath()
- throws IOException, FlinkException, ProgramInvocationException {
+ void testParameterConsiderationForMultipleJobsOnSystemClasspath() throws
Exception {
final String expectedSuffix = "suffix";
final PackagedProgramRetriever retrieverUnderTest =
// Both a class name is specified and a JAR "is" on the class
path
@@ -254,15 +253,15 @@ class DefaultPackagedProgramRetrieverITCase {
ClasspathProviderExtension.parametersForTestJob(expectedSuffix),
new Configuration());
- final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new
Configuration());
+ final StreamGraph streamGraph =
+ retrieveStreamGraph(retrieverUnderTest, new Configuration());
- assertThat(jobGraph.getName())
+ assertThat(streamGraph.getName())
.isEqualTo(testJobEntryClassClasspathProvider.getJobClassName() + "-suffix");
}
@Test
- void testSavepointRestoreSettings()
- throws FlinkException, IOException, ProgramInvocationException {
+ void testSavepointRestoreSettings() throws Exception {
final Configuration configuration = new Configuration();
final SavepointRestoreSettings savepointRestoreSettings =
SavepointRestoreSettings.forPath("foobar", true);
@@ -279,10 +278,10 @@ class DefaultPackagedProgramRetrieverITCase {
ClasspathProviderExtension.parametersForTestJob(expectedSuffix),
new Configuration());
- final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest,
configuration);
+ final StreamGraph streamGraph =
retrieveStreamGraph(retrieverUnderTest, configuration);
-
assertThat(jobGraph.getSavepointRestoreSettings()).isEqualTo(savepointRestoreSettings);
- assertThat(jobGraph.getJobID()).isEqualTo(jobId);
+
assertThat(streamGraph.getSavepointRestoreSettings()).isEqualTo(savepointRestoreSettings);
+ assertThat(streamGraph.getJobID()).isEqualTo(jobId);
}
@Test
@@ -394,17 +393,16 @@ class DefaultPackagedProgramRetrieverITCase {
}
@Test
- void testRetrieveCorrectUserClasspathsWithoutSpecifiedEntryClass()
- throws IOException, FlinkException, ProgramInvocationException {
+ void testRetrieveCorrectUserClasspathsWithoutSpecifiedEntryClass() throws
Exception {
final PackagedProgramRetriever retrieverUnderTest =
DefaultPackagedProgramRetriever.create(
singleEntryClassClasspathProvider.getDirectory(),
null,
ClasspathProviderExtension.parametersForTestJob("suffix"),
new Configuration());
- final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new
Configuration());
- final List<String> actualClasspath =
-
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList());
+ final StreamGraph streamGraph =
+ retrieveStreamGraph(retrieverUnderTest, new Configuration());
+ final List<String> actualClasspath = getClasspaths(streamGraph);
final List<String> expectedClasspath =
extractRelativizedURLsForJarsFromDirectory(
@@ -414,17 +412,16 @@ class DefaultPackagedProgramRetrieverITCase {
}
@Test
- void testRetrieveCorrectUserClasspathsWithSpecifiedEntryClass()
- throws IOException, FlinkException, ProgramInvocationException {
+ void testRetrieveCorrectUserClasspathsWithSpecifiedEntryClass() throws
Exception {
final PackagedProgramRetriever retrieverUnderTest =
DefaultPackagedProgramRetriever.create(
singleEntryClassClasspathProvider.getDirectory(),
singleEntryClassClasspathProvider.getJobClassName(),
ClasspathProviderExtension.parametersForTestJob("suffix"),
new Configuration());
- final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new
Configuration());
- final List<String> actualClasspath =
-
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList());
+ final StreamGraph streamGraph =
+ retrieveStreamGraph(retrieverUnderTest, new Configuration());
+ final List<String> actualClasspath = getClasspaths(streamGraph);
final List<String> expectedClasspath =
extractRelativizedURLsForJarsFromDirectory(
@@ -450,13 +447,13 @@ class DefaultPackagedProgramRetrieverITCase {
singleEntryClassClasspathProvider.getJobClassName(),
ClasspathProviderExtension.parametersForTestJob("suffix"),
configuration);
- final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new
Configuration());
- assertThat(jobGraph.getClasspaths()).isEqualTo(expectedMergedURLs);
+ final StreamGraph streamGraph =
+ retrieveStreamGraph(retrieverUnderTest, new Configuration());
+ assertThat(streamGraph.getClasspaths()).isEqualTo(expectedMergedURLs);
}
@Test
- void testRetrieveFromJarFileWithoutUserLib()
- throws IOException, FlinkException, ProgramInvocationException {
+ void testRetrieveFromJarFileWithoutUserLib() throws Exception {
final PackagedProgramRetriever retrieverUnderTest =
DefaultPackagedProgramRetriever.create(
null,
@@ -464,18 +461,18 @@ class DefaultPackagedProgramRetrieverITCase {
null,
ClasspathProviderExtension.parametersForTestJob("suffix"),
new Configuration());
- final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new
Configuration());
+ final StreamGraph streamGraph =
+ retrieveStreamGraph(retrieverUnderTest, new Configuration());
- assertThat(jobGraph.getUserJars())
+ assertThat(streamGraph.getUserJars())
.contains(
new org.apache.flink.core.fs.Path(
testJobEntryClassClasspathProvider.getJobJar().toURI()));
- assertThat(jobGraph.getClasspaths()).isEmpty();
+ assertThat(streamGraph.getClasspaths()).isEmpty();
}
@Test
- void testRetrieveFromJarFileWithUserLib()
- throws IOException, FlinkException, ProgramInvocationException {
+ void testRetrieveFromJarFileWithUserLib() throws Exception {
final PackagedProgramRetriever retrieverUnderTest =
DefaultPackagedProgramRetriever.create(
singleEntryClassClasspathProvider.getDirectory(),
@@ -484,14 +481,14 @@ class DefaultPackagedProgramRetrieverITCase {
null,
ClasspathProviderExtension.parametersForTestJob("suffix"),
new Configuration());
- final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new
Configuration());
+ final StreamGraph streamGraph =
+ retrieveStreamGraph(retrieverUnderTest, new Configuration());
- assertThat(jobGraph.getUserJars())
+ assertThat(streamGraph.getUserJars())
.contains(
new org.apache.flink.core.fs.Path(
testJobEntryClassClasspathProvider.getJobJar().toURI()));
- final List<String> actualClasspath =
-
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList());
+ final List<String> actualClasspath = getClasspaths(streamGraph);
final List<String> expectedClasspath =
extractRelativizedURLsForJarsFromDirectory(
singleEntryClassClasspathProvider.getDirectory());
@@ -500,8 +497,7 @@ class DefaultPackagedProgramRetrieverITCase {
}
@Test
- void testRetrieveFromJarFileWithNonRootUserLib()
- throws IOException, FlinkException, ProgramInvocationException {
+ void testRetrieveFromJarFileWithNonRootUserLib() throws Exception {
final PackagedProgramRetriever retrieverUnderTest =
DefaultPackagedProgramRetriever.create(
singleEntryClassClasspathProvider.getDirectory().getParentFile(),
@@ -511,14 +507,14 @@ class DefaultPackagedProgramRetrieverITCase {
null,
ClasspathProviderExtension.parametersForTestJob("suffix"),
new Configuration());
- final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new
Configuration());
+ final StreamGraph streamGraph =
+ retrieveStreamGraph(retrieverUnderTest, new Configuration());
- assertThat(jobGraph.getUserJars())
+ assertThat(streamGraph.getUserJars())
.contains(
new org.apache.flink.core.fs.Path(
testJobEntryClassClasspathProvider.getJobJar().toURI()));
- final List<String> actualClasspath =
-
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList());
+ final List<String> actualClasspath = getClasspaths(streamGraph);
final List<String> expectedClasspath =
extractRelativizedURLsForJarsFromDirectory(
singleEntryClassClasspathProvider.getDirectory());
@@ -527,8 +523,7 @@ class DefaultPackagedProgramRetrieverITCase {
}
@Test
- void testRetrieveFromJarFileWithSymlinkUserLib()
- throws IOException, FlinkException, ProgramInvocationException {
+ void testRetrieveFromJarFileWithSymlinkUserLib() throws Exception {
final File actualUsrLib = new
File(symlinkClasspathProvider.getDirectory(), "usrlib");
final PackagedProgramRetriever retrieverUnderTest =
DefaultPackagedProgramRetriever.create(
@@ -539,14 +534,14 @@ class DefaultPackagedProgramRetrieverITCase {
null,
ClasspathProviderExtension.parametersForTestJob("suffix"),
new Configuration());
- final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new
Configuration());
+ final StreamGraph streamGraph =
+ retrieveStreamGraph(retrieverUnderTest, new Configuration());
- assertThat(jobGraph.getUserJars())
+ assertThat(streamGraph.getUserJars())
.contains(
new org.apache.flink.core.fs.Path(
testJobEntryClassClasspathProvider.getJobJar().toURI()));
- final List<String> actualClasspath =
-
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList());
+ final List<String> actualClasspath = getClasspaths(streamGraph);
final List<String> expectedClasspath =
extractRelativizedURLsForJarsFromDirectory(actualUsrLib);
@@ -555,8 +550,7 @@ class DefaultPackagedProgramRetrieverITCase {
}
@Test
- void testRetrieveFromJarFileWithArtifacts()
- throws IOException, FlinkException, ProgramInvocationException {
+ void testRetrieveFromJarFileWithArtifacts() throws Exception {
final PackagedProgramRetriever retrieverUnderTest =
DefaultPackagedProgramRetriever.create(
null,
@@ -567,14 +561,14 @@ class DefaultPackagedProgramRetrieverITCase {
null,
ClasspathProviderExtension.parametersForTestJob("suffix"),
new Configuration());
- final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new
Configuration());
+ final StreamGraph streamGraph =
+ retrieveStreamGraph(retrieverUnderTest, new Configuration());
- assertThat(jobGraph.getUserJars())
+ assertThat(streamGraph.getUserJars())
.contains(
new org.apache.flink.core.fs.Path(
testJobEntryClassClasspathProvider.getJobJar().toURI()));
- final List<String> actualClasspath =
-
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList());
+ final List<String> actualClasspath = getClasspaths(streamGraph);
final List<String> expectedClasspath =
extractRelativizedURLsForJarsFromDirectory(
additionalArtifactClasspathProvider.getDirectory());
@@ -583,8 +577,7 @@ class DefaultPackagedProgramRetrieverITCase {
}
@Test
- void testRetrieveFromJarFileWithUserAndArtifactLib()
- throws IOException, FlinkException, ProgramInvocationException {
+ void testRetrieveFromJarFileWithUserAndArtifactLib() throws Exception {
final PackagedProgramRetriever retrieverUnderTest =
DefaultPackagedProgramRetriever.create(
singleEntryClassClasspathProvider.getDirectory(),
@@ -595,14 +588,14 @@ class DefaultPackagedProgramRetrieverITCase {
null,
ClasspathProviderExtension.parametersForTestJob("suffix"),
new Configuration());
- final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new
Configuration());
+ final StreamGraph streamGraph =
+ retrieveStreamGraph(retrieverUnderTest, new Configuration());
- assertThat(jobGraph.getUserJars())
+ assertThat(streamGraph.getUserJars())
.contains(
new org.apache.flink.core.fs.Path(
testJobEntryClassClasspathProvider.getJobJar().toURI()));
- final List<String> actualClasspath =
-
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList());
+ final List<String> actualClasspath = getClasspaths(streamGraph);
final List<String> expectedClasspath = new ArrayList<>();
expectedClasspath.addAll(
extractRelativizedURLsForJarsFromDirectory(
@@ -615,8 +608,7 @@ class DefaultPackagedProgramRetrieverITCase {
}
@Test
- void testRetrieveFromArtifactLibWithoutJarFile()
- throws IOException, FlinkException, ProgramInvocationException {
+ void testRetrieveFromArtifactLibWithoutJarFile() throws Exception {
final PackagedProgramRetriever retrieverUnderTest =
DefaultPackagedProgramRetriever.create(
null,
@@ -626,10 +618,10 @@ class DefaultPackagedProgramRetrieverITCase {
multipleEntryClassesClasspathProvider.getJobClassName(),
ClasspathProviderExtension.parametersForTestJob("suffix"),
new Configuration());
- final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new
Configuration());
+ final StreamGraph streamGraph =
+ retrieveStreamGraph(retrieverUnderTest, new Configuration());
- final List<String> actualClasspath =
-
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList());
+ final List<String> actualClasspath = getClasspaths(streamGraph);
final List<String> expectedClasspath =
extractRelativizedURLsForJarsFromDirectory(
multipleEntryClassesClasspathProvider.getDirectory());
@@ -682,9 +674,9 @@ class DefaultPackagedProgramRetrieverITCase {
.isInstanceOf(FlinkUserCodeClassLoaders.ParentFirstClassLoader.class);
}
- private JobGraph retrieveJobGraph(
+ private StreamGraph retrieveStreamGraph(
PackagedProgramRetriever retrieverUnderTest, Configuration
configuration)
- throws FlinkException, ProgramInvocationException,
MalformedURLException {
+ throws Exception {
final PackagedProgram packagedProgram =
retrieverUnderTest.getPackagedProgram();
final int defaultParallelism =
configuration.get(CoreOptions.DEFAULT_PARALLELISM);
@@ -702,8 +694,8 @@ class DefaultPackagedProgramRetrieverITCase {
final Pipeline pipeline =
PackagedProgramUtils.getPipelineFromProgram(
packagedProgram, configuration, defaultParallelism,
false);
- return PipelineExecutorUtils.getJobGraph(
- pipeline, configuration,
packagedProgram.getUserCodeClassLoader());
+
+ return PipelineExecutorUtils.getStreamGraph(pipeline, configuration);
}
private static List<String>
extractRelativizedURLsForJarsFromDirectory(File directory)
@@ -732,4 +724,8 @@ class DefaultPackagedProgramRetrieverITCase {
return relativizedURLs;
}
+
+ private List<String> getClasspaths(StreamGraph streamGraph) {
+ return
streamGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList());
+ }
}