zouchangzhen opened a new issue, #4297: URL: https://github.com/apache/streampark/issues/4297
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/streampark/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### Java Version java 8.0 ### Scala Version 2.11.x ### StreamPark Version 2.0.0 ### Flink Version flink 1.16.1 ### Deploy mode yarn-application ### What happened 批量提交flink任务出现内存回收不掉的情况,我这个版本大概是2021年的,但是我看了最新版本也是差不多的 override def doSubmit(submitRequest: SubmitRequest): SubmitResponse = { //获取有效的命令行参数,来自:页面多个参数配置,和动态参数dynamicOption //-t yarn-application -Dtaskmanager.memory.process.size=1024mb -Dparallelism.default=1 -Dtaskmanager.numberOfTaskSlots=1 -Djobmanager.memory.process.size=1024mb -Dclassloader.resolve-order=parent-first val commandLine = getEffectiveCommandLine( submitRequest, "-t" -> YarnDeploymentTarget.APPLICATION.getName ) val activeCommandLine = validateAndGetActiveCommandLine(submitRequest.customCommandLines, commandLine) val uri = PackagedProgramUtils.resolveURI(submitRequest.flinkUserJar) //根据用户配置的命令行参数和flink默认参数来组装提交yarn的flink参数 val flinkConfig = getEffectiveConfiguration(submitRequest, activeCommandLine, commandLine, Collections.singletonList(uri.toString)) SecurityUtils.install(new SecurityConfiguration(flinkConfig)) SecurityUtils.getInstalledContext.runSecured(new Callable[SubmitResponse] { override def call(): SubmitResponse = { val clusterClientServiceLoader = new DefaultClusterClientServiceLoader //获取yarn集群客户端工厂 val clientFactory = clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig) var clusterDescriptor: ClusterDescriptor[ApplicationId] = null try { clusterDescriptor = clientFactory.createClusterDescriptor(flinkConfig) val clusterSpecification = clientFactory.getClusterSpecification(flinkConfig) logInfo( s""" |------------------------<<specification>>------------------------- |$clusterSpecification |------------------------------------------------------------------ |""".stripMargin) //设置用户jar的参数和主类 val applicationConfiguration = ApplicationConfiguration.fromConfiguration(flinkConfig) var applicationId: ApplicationId = null //提交任务到集群 var clusterClient: ClusterClient[ApplicationId] = null try { clusterClient = clusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration).getClusterClient //获取applicationId applicationId = clusterClient.getClusterId }finally { if (clusterClient != null) { Try(clusterClient.close()).recover { case e => logInfo("Failed to close cluster client", e) } } } logInfo( s""" ||-------------------------<<applicationId>>------------------------| ||Flink Job Started: applicationId: $applicationId| ||__________________________________________________________________| |""".stripMargin) //返回appId和提交yarn的配置参数 service.SubmitResponse(applicationId.toString, flinkConfig, null,null) } finally { if (clusterDescriptor != null) { Try(clusterDescriptor.close()).recover { case e => logInfo("Failed to close clusterDescriptor", e) } } } } }) } <img width="1510" height="956" alt="Image" src="https://github.com/user-attachments/assets/7472a007-dc8d-4f08-b4a9-d9003a52d021" /> <img width="1872" height="987" alt="Image" src="https://github.com/user-attachments/assets/89673bda-f8c3-4ac4-8ab8-67338e02dca8" /> <img width="1914" height="1012" alt="Image" src="https://github.com/user-attachments/assets/1a4613a4-2065-433f-828d-2398fff36f22" /> <img width="1862" height="875" alt="Image" src="https://github.com/user-attachments/assets/459e57d6-feee-4d1b-af5a-bcd518f6efb7" /> ### Error Exception ```log ``` ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR!(您是否要贡献这个PR?) ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
