shangeyao commented on code in PR #11939: URL: https://github.com/apache/dolphinscheduler/pull/11939#discussion_r973806577
########## dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/executor/AbstractClusterExecutor.java: ########## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.flink.executor; + +import org.apache.commons.lang3.StringUtils; +import org.apache.dolphinscheduler.plugin.task.flink.entity.ParamsInfo; +import org.apache.dolphinscheduler.plugin.task.flink.entity.ResultInfo; +import org.apache.dolphinscheduler.plugin.task.flink.factory.YarnClusterDescriptorFactory; +import org.apache.dolphinscheduler.plugin.task.flink.utils.JobGraphBuildUtil; +import org.apache.dolphinscheduler.spi.utils.Constants; +import org.apache.dolphinscheduler.spi.utils.PropertyUtils; +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ClusterClientProvider; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.FlinkException; +import org.apache.flink.yarn.YarnClusterDescriptor; +import org.apache.flink.yarn.configuration.YarnConfigOptions; +import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOG4J_NAME; +import static org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME; + +public abstract class AbstractClusterExecutor { + + private static final Logger logger = LoggerFactory.getLogger(AbstractClusterExecutor.class); + + private static final String DEFAULT_TOTAL_PROCESS_MEMORY = "1024m"; + + public ParamsInfo jobParamsInfo; + + public AbstractClusterExecutor(ParamsInfo jobParamsInfo) { + this.jobParamsInfo = jobParamsInfo; + } + + /** + * submit job + * + * @return + */ + public abstract ResultInfo submitJob(); + + public YarnClient createYarnClient() throws IOException { + YarnClient yarnClient = + YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromHadoopConfDir( + jobParamsInfo.getHadoopConfDir()); + logger.info( + "yarn client successfully created, hadoop conf dir:{}", + jobParamsInfo.getHadoopConfDir()); + return yarnClient; + } + + /** + * kill yarn job and clean application files in hdfs. + * + * @return + */ + public ResultInfo killJob() throws IOException { + String applicationId = jobParamsInfo.getApplicationId(); + if (StringUtils.isEmpty(applicationId)) { + throw new NullPointerException("kill yarn job applicationId is required!"); + } + logger.info("killed applicationId is:{}", applicationId); + YarnConfiguration yarnConfiguration = + YarnClusterDescriptorFactory.INSTANCE.parseYarnConfFromConfDir( + jobParamsInfo.getHadoopConfDir()); + + try ( + YarnClient yarnClient = + YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromYarnConf( + yarnConfiguration);) { + yarnClient.killApplication(ApplicationId.fromString(applicationId)); + logger.info("killed applicationId {} was unsuccessful.", applicationId); + } catch (YarnException e) { + logger.error("killed applicationId {0} was failed.", e); + return new ResultInfo("", ""); + } + + try (FileSystem fs = FileSystem.get(yarnConfiguration)) { + Path applicationDir = + new Path( + checkNotNull(fs.getHomeDirectory()), + ".flink/" + checkNotNull(applicationId) + '/'); + if (!fs.delete(applicationDir, true)) { + logger.error( + "Deleting yarn application files under {} was unsuccessful.", + applicationDir); + } else { + logger.error( + "Deleting yarn application files under {} was successful.", applicationDir); + } + } catch (Exception e) { + logger.error("Deleting yarn application files was failed!", e); + } + return new ResultInfo("", ""); + } + + public ResultInfo cancelJob(boolean doSavepoint) { + String appId = jobParamsInfo.getApplicationId(); + String jobId = jobParamsInfo.getFlinkJobId(); + + logger.info("cancel Job appId:{}, jobId:{}", appId, jobId); + + ApplicationId applicationId = ApplicationId.fromString(appId); + JobID flinkJobId = new JobID(org.apache.flink.util.StringUtils.hexStringToByte(jobId)); + + Configuration flinkConfig = getFlinkConfigFromParamsInfo(); + try ( + YarnClusterDescriptor clusterDescriptor = + (YarnClusterDescriptor) YarnClusterDescriptorFactory.INSTANCE.createClusterDescriptor( + jobParamsInfo.getHadoopConfDir(), flinkConfig)) { + + ClusterClientProvider<ApplicationId> retrieve = + clusterDescriptor.retrieve(applicationId); + try (ClusterClient<ApplicationId> clusterClient = retrieve.getClusterClient()) { + if (doSavepoint) { + CompletableFuture<String> savepointFuture = + clusterClient.cancelWithSavepoint(flinkJobId, null, SavepointFormatType.DEFAULT); + Object result = savepointFuture.get(2, TimeUnit.MINUTES); + logger.info("flink job savepoint path: {}", result.toString()); + } else { + CompletableFuture<Acknowledge> cancelFuture = clusterClient.cancel(flinkJobId); + Object result = cancelFuture.get(2, TimeUnit.MINUTES); + logger.info("flink job cancel result: {}", result.toString()); + } + } catch (Exception e) { + try { + logger.error("cancel job error, will kill job:", e); + clusterDescriptor.killCluster(applicationId); + } catch (FlinkException e1) { + logger.error("yarn cluster Descriptor kill cluster error:", e); + return new ResultInfo("", ""); + } + } + + } catch (Exception e) { + logger.error("cancel job failed,appId:{}, jobId:{}, exception:{}", appId, jobId, e); + return new ResultInfo(appId, jobId); + } + + return new ResultInfo(appId, jobId); + } + + public ResultInfo savePoint() { + String appId = jobParamsInfo.getApplicationId(); + String jobId = jobParamsInfo.getFlinkJobId(); + + logger.info("cancel Job appId:{}, jobId:{}", appId, jobId); + + ApplicationId applicationId = ApplicationId.fromString(appId); + JobID flinkJobId = new JobID(org.apache.flink.util.StringUtils.hexStringToByte(jobId)); + + Configuration flinkConfig = getFlinkConfigFromParamsInfo(); + try ( + YarnClusterDescriptor clusterDescriptor = + (YarnClusterDescriptor) YarnClusterDescriptorFactory.INSTANCE.createClusterDescriptor( + jobParamsInfo.getHadoopConfDir(), flinkConfig)) { + + ClusterClientProvider<ApplicationId> retrieve = + clusterDescriptor.retrieve(applicationId); + try (ClusterClient<ApplicationId> clusterClient = retrieve.getClusterClient()) { + CompletableFuture<String> savepointFuture = + clusterClient.triggerSavepoint(flinkJobId, null, SavepointFormatType.DEFAULT); + Object result = savepointFuture.get(2, TimeUnit.MINUTES); + logger.info("flink job savepoint path: {}", result.toString()); + } catch (Exception e) { + logger.error("flink job savepoint error", e); + } + + } catch (Exception e) { + logger.error("flink job savepoint failed, appId:{}, jobId:{}, exception:{}", appId, jobId, e); + return new ResultInfo(appId, jobId); Review Comment: yeah, I correct it. ########## dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/executor/AbstractClusterExecutor.java: ########## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.flink.executor; + +import org.apache.commons.lang3.StringUtils; +import org.apache.dolphinscheduler.plugin.task.flink.entity.ParamsInfo; +import org.apache.dolphinscheduler.plugin.task.flink.entity.ResultInfo; +import org.apache.dolphinscheduler.plugin.task.flink.factory.YarnClusterDescriptorFactory; +import org.apache.dolphinscheduler.plugin.task.flink.utils.JobGraphBuildUtil; +import org.apache.dolphinscheduler.spi.utils.Constants; +import org.apache.dolphinscheduler.spi.utils.PropertyUtils; +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ClusterClientProvider; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.FlinkException; +import org.apache.flink.yarn.YarnClusterDescriptor; +import org.apache.flink.yarn.configuration.YarnConfigOptions; +import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOG4J_NAME; +import static org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME; + +public abstract class AbstractClusterExecutor { + + private static final Logger logger = LoggerFactory.getLogger(AbstractClusterExecutor.class); + + private static final String DEFAULT_TOTAL_PROCESS_MEMORY = "1024m"; + + public ParamsInfo jobParamsInfo; + + public AbstractClusterExecutor(ParamsInfo jobParamsInfo) { + this.jobParamsInfo = jobParamsInfo; + } + + /** + * submit job + * + * @return + */ + public abstract ResultInfo submitJob(); + + public YarnClient createYarnClient() throws IOException { + YarnClient yarnClient = + YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromHadoopConfDir( + jobParamsInfo.getHadoopConfDir()); + logger.info( + "yarn client successfully created, hadoop conf dir:{}", + jobParamsInfo.getHadoopConfDir()); + return yarnClient; + } + + /** + * kill yarn job and clean application files in hdfs. + * + * @return + */ + public ResultInfo killJob() throws IOException { + String applicationId = jobParamsInfo.getApplicationId(); + if (StringUtils.isEmpty(applicationId)) { + throw new NullPointerException("kill yarn job applicationId is required!"); + } + logger.info("killed applicationId is:{}", applicationId); + YarnConfiguration yarnConfiguration = + YarnClusterDescriptorFactory.INSTANCE.parseYarnConfFromConfDir( + jobParamsInfo.getHadoopConfDir()); + + try ( + YarnClient yarnClient = + YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromYarnConf( + yarnConfiguration);) { + yarnClient.killApplication(ApplicationId.fromString(applicationId)); + logger.info("killed applicationId {} was unsuccessful.", applicationId); + } catch (YarnException e) { + logger.error("killed applicationId {0} was failed.", e); + return new ResultInfo("", ""); + } + + try (FileSystem fs = FileSystem.get(yarnConfiguration)) { + Path applicationDir = + new Path( + checkNotNull(fs.getHomeDirectory()), + ".flink/" + checkNotNull(applicationId) + '/'); + if (!fs.delete(applicationDir, true)) { + logger.error( + "Deleting yarn application files under {} was unsuccessful.", + applicationDir); + } else { + logger.error( + "Deleting yarn application files under {} was successful.", applicationDir); + } + } catch (Exception e) { + logger.error("Deleting yarn application files was failed!", e); + } + return new ResultInfo("", ""); + } + + public ResultInfo cancelJob(boolean doSavepoint) { + String appId = jobParamsInfo.getApplicationId(); + String jobId = jobParamsInfo.getFlinkJobId(); + + logger.info("cancel Job appId:{}, jobId:{}", appId, jobId); + + ApplicationId applicationId = ApplicationId.fromString(appId); + JobID flinkJobId = new JobID(org.apache.flink.util.StringUtils.hexStringToByte(jobId)); + + Configuration flinkConfig = getFlinkConfigFromParamsInfo(); + try ( + YarnClusterDescriptor clusterDescriptor = + (YarnClusterDescriptor) YarnClusterDescriptorFactory.INSTANCE.createClusterDescriptor( + jobParamsInfo.getHadoopConfDir(), flinkConfig)) { + + ClusterClientProvider<ApplicationId> retrieve = + clusterDescriptor.retrieve(applicationId); + try (ClusterClient<ApplicationId> clusterClient = retrieve.getClusterClient()) { + if (doSavepoint) { + CompletableFuture<String> savepointFuture = + clusterClient.cancelWithSavepoint(flinkJobId, null, SavepointFormatType.DEFAULT); + Object result = savepointFuture.get(2, TimeUnit.MINUTES); + logger.info("flink job savepoint path: {}", result.toString()); + } else { + CompletableFuture<Acknowledge> cancelFuture = clusterClient.cancel(flinkJobId); + Object result = cancelFuture.get(2, TimeUnit.MINUTES); + logger.info("flink job cancel result: {}", result.toString()); + } + } catch (Exception e) { + try { + logger.error("cancel job error, will kill job:", e); + clusterDescriptor.killCluster(applicationId); + } catch (FlinkException e1) { + logger.error("yarn cluster Descriptor kill cluster error:", e); + return new ResultInfo("", ""); + } + } + + } catch (Exception e) { + logger.error("cancel job failed,appId:{}, jobId:{}, exception:{}", appId, jobId, e); + return new ResultInfo(appId, jobId); + } + + return new ResultInfo(appId, jobId); + } + + public ResultInfo savePoint() { + String appId = jobParamsInfo.getApplicationId(); + String jobId = jobParamsInfo.getFlinkJobId(); + + logger.info("cancel Job appId:{}, jobId:{}", appId, jobId); + + ApplicationId applicationId = ApplicationId.fromString(appId); + JobID flinkJobId = new JobID(org.apache.flink.util.StringUtils.hexStringToByte(jobId)); + + Configuration flinkConfig = getFlinkConfigFromParamsInfo(); + try ( + YarnClusterDescriptor clusterDescriptor = + (YarnClusterDescriptor) YarnClusterDescriptorFactory.INSTANCE.createClusterDescriptor( + jobParamsInfo.getHadoopConfDir(), flinkConfig)) { + + ClusterClientProvider<ApplicationId> retrieve = + clusterDescriptor.retrieve(applicationId); + try (ClusterClient<ApplicationId> clusterClient = retrieve.getClusterClient()) { + CompletableFuture<String> savepointFuture = + clusterClient.triggerSavepoint(flinkJobId, null, SavepointFormatType.DEFAULT); + Object result = savepointFuture.get(2, TimeUnit.MINUTES); + logger.info("flink job savepoint path: {}", result.toString()); + } catch (Exception e) { + logger.error("flink job savepoint error", e); Review Comment: yeah, I correct it. ########## dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/factory/YarnClusterDescriptorFactory.java: ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.flink.factory; + +import org.apache.commons.lang.StringUtils; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.function.FunctionUtils; +import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever; +import org.apache.flink.yarn.YarnClusterDescriptor; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import java.io.File; +import java.io.IOException; + +public enum YarnClusterDescriptorFactory implements AbstractClusterDescriptorFactory { + + INSTANCE; + + private static final String XML_FILE_EXTENSION = "xml"; + + @Override + public ClusterDescriptor createClusterDescriptor( + String hadoopConfDir, Configuration flinkConfig) { + if (StringUtils.isNotBlank(hadoopConfDir)) { + try { + YarnConfiguration yarnConf = parseYarnConfFromConfDir(hadoopConfDir); + YarnClient yarnClient = createYarnClientFromYarnConf(yarnConf); + + YarnClusterDescriptor clusterDescriptor = + new YarnClusterDescriptor( + flinkConfig, + yarnConf, + yarnClient, + YarnClientYarnClusterInformationRetriever.create(yarnClient), + false); + return clusterDescriptor; + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + throw new RuntimeException("yarn mode must set param of 'hadoopConfDir'"); + } + } Review Comment: yeah, I correct it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
