[ 
https://issues.apache.org/jira/browse/FLINK-18803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171368#comment-17171368
 ] 

Andrey Zagrebin commented on FLINK-18803:
-----------------------------------------

[~lbqin] Could you try the latest Flink versions 1.10/1.11?
1.7 is quite old, I do not think community will release any fixes for 1.7.

If this is not a problem with the latest Flink version (1.10/1.11), I suggest 
to close this issue.

> JobGraph cannot be GC when submit via RemoteStreamEnvironment in attach mode 
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-18803
>                 URL: https://issues.apache.org/jira/browse/FLINK-18803
>             Project: Flink
>          Issue Type: Bug
>          Components: Client / Job Submission
>    Affects Versions: 1.7.2
>            Reporter: Libin Qin
>            Priority: Minor
>         Attachments: image-2020-08-03-18-01-56-062.png, 
> image-2020-08-03-18-02-22-519.png, image-2020-08-03-18-03-29-748.png, 
> image-2020-08-03-18-08-50-811.png, image-2020-08-03-18-10-08-353.png, 
> image-2020-08-03-18-12-29-467.png
>
>
> When submit job using  RemoteStreamEnvironment in attach mode. The client 
> submission thread is blocked on "jobResultFuture.get()" in the "submitJob" 
> method of RestClusterClient.java, it holds the local variable jobGraph, if 
> the job is complex with lots of vertexs and edges or client submits quite a 
> lot of jobs. The size of jobGraph become large and the client may OOM. I 
> think there is no need for client to hold it.
> The biggest objects of client heap  is as below ,The number of tasks of this 
> job is  408
> !image-2020-08-03-18-03-29-748.png!
>  
> !image-2020-08-03-18-08-50-811.png!
>  
>  
> perhaps we can null out it after success of submission
>  
> {code:java}
> //代码占位符
> public JobSubmissionResult run(FlinkPlan compiledPlan,
>       List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, 
> SavepointRestoreSettings savepointSettings)
>       throws ProgramInvocationException {
>    return submitJob(() -> getJobGraph(flinkConfig, compiledPlan, libraries, 
> classpaths, savepointSettings), classLoader);
> }
> public JobSubmissionResult submitJob(Supplier<JobGraph> jobGraphSupplier, 
> ClassLoader classLoader) throws ProgramInvocationException {
>    JobGraph jobGraph = jobGraphSupplier.get();
>    JobID jobID = jobGraph.getJobID();
>    log.info("Submitting job {} (detached: {}).", jobID, isDetached());
>    final CompletableFuture<JobSubmissionResult> jobSubmissionFuture = 
> submitJob(jobGraph);
>    JobSubmissionResult result;
>    try {
>       result = jobSubmissionFuture.get();
>       //help GC
>       jobGraph = null;
>    } catch (Exception e) {
>       throw new ProgramInvocationException("Could not submit job",
>          jobID, ExceptionUtils.stripExecutionException(e));
>    }
>    if (isDetached()) {
>       return result;
>    } else {
>       final CompletableFuture<JobResult> jobResultFuture = 
> requestJobResult(jobID);
>       final JobResult jobResult;
>       try {
>          jobResult = jobResultFuture.get();
>       } catch (Exception e) {
>          throw new ProgramInvocationException("Could not retrieve the 
> execution result.",
>             jobID, ExceptionUtils.stripExecutionException(e));
>       }
>       try {
>          this.lastJobExecutionResult = 
> jobResult.toJobExecutionResult(classLoader);
>          return lastJobExecutionResult;
>       } catch (JobExecutionException e) {
>          throw new ProgramInvocationException("Job failed.", jobID, e);
>       } catch (IOException | ClassNotFoundException e) {
>          throw new ProgramInvocationException("Job failed.", jobID, e);
>       }
>    }
> }
> {code}
>  
> we can see the job graph has been GC
> !image-2020-08-03-18-10-08-353.png!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to