[
https://issues.apache.org/jira/browse/FLINK-18803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171388#comment-17171388
]
Libin Qin commented on FLINK-18803:
-----------------------------------
[~azagrebin] I tried Flink 1.10, it works well, thanks!
> JobGraph cannot be GC when submit via RemoteStreamEnvironment in attach mode
> -----------------------------------------------------------------------------
>
> Key: FLINK-18803
> URL: https://issues.apache.org/jira/browse/FLINK-18803
> Project: Flink
> Issue Type: Bug
> Components: Client / Job Submission
> Affects Versions: 1.7.2
> Reporter: Libin Qin
> Priority: Minor
> Attachments: image-2020-08-03-18-01-56-062.png,
> image-2020-08-03-18-02-22-519.png, image-2020-08-03-18-03-29-748.png,
> image-2020-08-03-18-08-50-811.png, image-2020-08-03-18-10-08-353.png,
> image-2020-08-03-18-12-29-467.png
>
>
> When submit job using RemoteStreamEnvironment in attach mode. The client
> submission thread is blocked on "jobResultFuture.get()" in the "submitJob"
> method of RestClusterClient.java, it holds the local variable jobGraph, if
> the job is complex with lots of vertexs and edges or client submits quite a
> lot of jobs. The size of jobGraph become large and the client may OOM. I
> think there is no need for client to hold it.
> The biggest objects of client heap is as below ,The number of tasks of this
> job is 408
> !image-2020-08-03-18-03-29-748.png!
>
> !image-2020-08-03-18-08-50-811.png!
>
>
> perhaps we can null out it after success of submission
>
> {code:java}
> //代码占位符
> public JobSubmissionResult run(FlinkPlan compiledPlan,
> List<URL> libraries, List<URL> classpaths, ClassLoader classLoader,
> SavepointRestoreSettings savepointSettings)
> throws ProgramInvocationException {
> return submitJob(() -> getJobGraph(flinkConfig, compiledPlan, libraries,
> classpaths, savepointSettings), classLoader);
> }
> public JobSubmissionResult submitJob(Supplier<JobGraph> jobGraphSupplier,
> ClassLoader classLoader) throws ProgramInvocationException {
> JobGraph jobGraph = jobGraphSupplier.get();
> JobID jobID = jobGraph.getJobID();
> log.info("Submitting job {} (detached: {}).", jobID, isDetached());
> final CompletableFuture<JobSubmissionResult> jobSubmissionFuture =
> submitJob(jobGraph);
> JobSubmissionResult result;
> try {
> result = jobSubmissionFuture.get();
> //help GC
> jobGraph = null;
> } catch (Exception e) {
> throw new ProgramInvocationException("Could not submit job",
> jobID, ExceptionUtils.stripExecutionException(e));
> }
> if (isDetached()) {
> return result;
> } else {
> final CompletableFuture<JobResult> jobResultFuture =
> requestJobResult(jobID);
> final JobResult jobResult;
> try {
> jobResult = jobResultFuture.get();
> } catch (Exception e) {
> throw new ProgramInvocationException("Could not retrieve the
> execution result.",
> jobID, ExceptionUtils.stripExecutionException(e));
> }
> try {
> this.lastJobExecutionResult =
> jobResult.toJobExecutionResult(classLoader);
> return lastJobExecutionResult;
> } catch (JobExecutionException e) {
> throw new ProgramInvocationException("Job failed.", jobID, e);
> } catch (IOException | ClassNotFoundException e) {
> throw new ProgramInvocationException("Job failed.", jobID, e);
> }
> }
> }
> {code}
>
> we can see the job graph has been GC
> !image-2020-08-03-18-10-08-353.png!
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)