zouchangzhen opened a new issue, #4297:
URL: https://github.com/apache/streampark/issues/4297

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/streampark/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### Java Version
   
   java 8.0
   
   ### Scala Version
   
   2.11.x
   
   ### StreamPark Version
   
   2.0.0
   
   ### Flink Version
   
   flink 1.16.1
   
   ### Deploy mode
   
   yarn-application
   
   ### What happened
   
   批量提交flink任务出现内存回收不掉的情况,我这个版本大概是2021年的,但是我看了最新版本也是差不多的
   
   override def doSubmit(submitRequest: SubmitRequest): SubmitResponse = {
   
       //获取有效的命令行参数,来自:页面多个参数配置,和动态参数dynamicOption
       //-t yarn-application -Dtaskmanager.memory.process.size=1024mb 
-Dparallelism.default=1 -Dtaskmanager.numberOfTaskSlots=1 
-Djobmanager.memory.process.size=1024mb -Dclassloader.resolve-order=parent-first
       val commandLine = getEffectiveCommandLine(
         submitRequest,
         "-t" -> YarnDeploymentTarget.APPLICATION.getName
       )
       val activeCommandLine = 
validateAndGetActiveCommandLine(submitRequest.customCommandLines, commandLine)
   
       val uri = PackagedProgramUtils.resolveURI(submitRequest.flinkUserJar)
   
       //根据用户配置的命令行参数和flink默认参数来组装提交yarn的flink参数
       val flinkConfig = getEffectiveConfiguration(submitRequest, 
activeCommandLine, commandLine, Collections.singletonList(uri.toString))
   
       SecurityUtils.install(new SecurityConfiguration(flinkConfig))
       SecurityUtils.getInstalledContext.runSecured(new 
Callable[SubmitResponse] {
         override def call(): SubmitResponse = {
           val clusterClientServiceLoader = new 
DefaultClusterClientServiceLoader
           //获取yarn集群客户端工厂
           val clientFactory = 
clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
           var clusterDescriptor: ClusterDescriptor[ApplicationId] = null
           try {
             clusterDescriptor = 
clientFactory.createClusterDescriptor(flinkConfig)
             val clusterSpecification = 
clientFactory.getClusterSpecification(flinkConfig)
             logInfo(
               s"""
                  
|------------------------<<specification>>-------------------------
                  |$clusterSpecification
                  
|------------------------------------------------------------------
                  |""".stripMargin)
   
             //设置用户jar的参数和主类
             val applicationConfiguration = 
ApplicationConfiguration.fromConfiguration(flinkConfig)
             var applicationId: ApplicationId = null
             //提交任务到集群
             var clusterClient: ClusterClient[ApplicationId] = null
             try {
               clusterClient = 
clusterDescriptor.deployApplicationCluster(clusterSpecification, 
applicationConfiguration).getClusterClient
               //获取applicationId
               applicationId = clusterClient.getClusterId
             }finally {
               if (clusterClient != null) {
                 Try(clusterClient.close()).recover {
                   case e => logInfo("Failed to close cluster client", e)
                 }
               }
             }
   
             logInfo(
               s"""
                  
||-------------------------<<applicationId>>------------------------|
                  ||Flink Job Started: applicationId: $applicationId|
                  
||__________________________________________________________________|
                  |""".stripMargin)
   
             //返回appId和提交yarn的配置参数
             service.SubmitResponse(applicationId.toString, flinkConfig, 
null,null)
           } finally {
             if (clusterDescriptor != null) {
               Try(clusterDescriptor.close()).recover {
                 case e => logInfo("Failed to close clusterDescriptor", e)
               }
             }
           }
         }
       })
     }
   
   <img width="1510" height="956" alt="Image" 
src="https://github.com/user-attachments/assets/7472a007-dc8d-4f08-b4a9-d9003a52d021";
 />
   <img width="1872" height="987" alt="Image" 
src="https://github.com/user-attachments/assets/89673bda-f8c3-4ac4-8ab8-67338e02dca8";
 />
   <img width="1914" height="1012" alt="Image" 
src="https://github.com/user-attachments/assets/1a4613a4-2065-433f-828d-2398fff36f22";
 />
   <img width="1862" height="875" alt="Image" 
src="https://github.com/user-attachments/assets/459e57d6-feee-4d1b-af5a-bcd518f6efb7";
 />
   
   ### Error Exception
   
   ```log
   
   ```
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!(您是否要贡献这个PR?)
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to