shangeyao commented on code in PR #11939:
URL: https://github.com/apache/dolphinscheduler/pull/11939#discussion_r973806527
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java:
##########
@@ -55,37 +67,110 @@ public void init() {
flinkParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
FlinkStreamParameters.class);
if (flinkParameters == null || !flinkParameters.checkParameters()) {
- throw new RuntimeException("flink task params is not valid");
+ throw new RuntimeException("flink stream task params is not
valid");
}
flinkParameters.setQueue(taskExecutionContext.getQueue());
setMainJarName();
+ }
- FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
+ @Override
+ public List<String> getApplicationIds() throws TaskException {
+ return Collections.emptyList();
}
- /**
- * create command
- *
- * @return command
- */
@Override
- protected String buildCommand() {
- // flink run/run-application [OPTIONS] <jar-file> <arguments>
- List<String> args =
FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, flinkParameters);
+ public void cancelApplication() throws TaskException {
+ String hadoopConfDir = System.getenv(HADOOP_CONF_DIR);
+
+ initResultInfo();
- String command = ParameterUtils
- .convertParameterPlaceholders(String.join(" ", args),
taskExecutionContext.getDefinedParams());
+ ParamsInfo jobParamsInfo = ParamsInfo.builder()
+ .hadoopConfDir(hadoopConfDir)
+ .applicationId(flinkStreamResultInfo.getAppId())
+ .flinkJobId(flinkStreamResultInfo.getJobId())
+ .build();
- logger.info("flink task command : {}", command);
- return command;
+ try {
+ ClusterClient clusterClient = ClusterClient.INSTANCE;
+ ResultInfo jobResult = clusterClient.cancelFlinkJob(jobParamsInfo);
+ setExitStatusCode(EXIT_CODE_KILL);
+ logger.info(
+ String.format(
+ "job cancel result, appId:%s, jobId:%s",
+ jobResult.getAppId(), jobResult.getJobId()));
+ } catch (Exception e) {
+ logger.error("cancel flink stream task failure", e);
+ throw new TaskException("cancel flink stream task error", e);
+ }
}
@Override
- protected void setMainJarName() {
- ResourceInfo mainJar = flinkParameters.getMainJar();
- String resourceName = getResourceNameOfMainJar(mainJar);
- mainJar.setRes(resourceName);
- flinkParameters.setMainJar(mainJar);
+ public void submitApplication() throws TaskException {
+ String flinkHome = System.getenv(FLINK_HOME);
+
+ if (flinkHome.isEmpty()) {
+ logger.error("Please make sure to set the FLINK_HOME environment
variable.");
+ }
+
+ String hadoopConfDir = System.getenv(HADOOP_CONF_DIR);
+
+ if (hadoopConfDir.isEmpty()) {
+ logger.error("Please make sure to set the HADOOP_CONF_DIR
environment variable.");
Review Comment:
yeah, I correct it.
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/executor/AbstractClusterExecutor.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.executor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ParamsInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ResultInfo;
+import
org.apache.dolphinscheduler.plugin.task.flink.factory.YarnClusterDescriptorFactory;
+import org.apache.dolphinscheduler.plugin.task.flink.utils.JobGraphBuildUtil;
+import org.apache.dolphinscheduler.spi.utils.Constants;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static
org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOG4J_NAME;
+import static
org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME;
+
+public abstract class AbstractClusterExecutor {
+
+ private static final Logger logger =
LoggerFactory.getLogger(AbstractClusterExecutor.class);
+
+ private static final String DEFAULT_TOTAL_PROCESS_MEMORY = "1024m";
+
+ public ParamsInfo jobParamsInfo;
+
+ public AbstractClusterExecutor(ParamsInfo jobParamsInfo) {
+ this.jobParamsInfo = jobParamsInfo;
+ }
+
+ /**
+ * submit job
+ *
+ * @return
+ */
+ public abstract ResultInfo submitJob();
+
+ public YarnClient createYarnClient() throws IOException {
+ YarnClient yarnClient =
+
YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromHadoopConfDir(
+ jobParamsInfo.getHadoopConfDir());
+ logger.info(
+ "yarn client successfully created, hadoop conf dir:{}",
+ jobParamsInfo.getHadoopConfDir());
+ return yarnClient;
+ }
+
+ /**
+ * kill yarn job and clean application files in hdfs.
+ *
+ * @return
+ */
+ public ResultInfo killJob() throws IOException {
+ String applicationId = jobParamsInfo.getApplicationId();
+ if (StringUtils.isEmpty(applicationId)) {
+ throw new NullPointerException("kill yarn job applicationId is
required!");
+ }
+ logger.info("killed applicationId is:{}", applicationId);
+ YarnConfiguration yarnConfiguration =
+ YarnClusterDescriptorFactory.INSTANCE.parseYarnConfFromConfDir(
+ jobParamsInfo.getHadoopConfDir());
+
+ try (
+ YarnClient yarnClient =
+
YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromYarnConf(
+ yarnConfiguration);) {
+
yarnClient.killApplication(ApplicationId.fromString(applicationId));
+ logger.info("killed applicationId {} was unsuccessful.",
applicationId);
+ } catch (YarnException e) {
+ logger.error("killed applicationId {0} was failed.", e);
+ return new ResultInfo("", "");
+ }
+
+ try (FileSystem fs = FileSystem.get(yarnConfiguration)) {
+ Path applicationDir =
+ new Path(
+ checkNotNull(fs.getHomeDirectory()),
+ ".flink/" + checkNotNull(applicationId) + '/');
+ if (!fs.delete(applicationDir, true)) {
+ logger.error(
+ "Deleting yarn application files under {} was
unsuccessful.",
+ applicationDir);
+ } else {
+ logger.error(
Review Comment:
yeah, I correct it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]