github-code-scanning[bot] commented on code in PR #11939:
URL: https://github.com/apache/dolphinscheduler/pull/11939#discussion_r970856087


##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/executor/AbstractClusterExecutor.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOG4J_NAME;
+import static 
org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME;
+
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ParamsInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ResultInfo;
+import 
org.apache.dolphinscheduler.plugin.task.flink.factory.YarnClusterDescriptorFactory;
+import org.apache.dolphinscheduler.plugin.task.flink.utils.JobGraphBuildUtil;
+import org.apache.dolphinscheduler.spi.utils.Constants;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractClusterExecutor {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(AbstractClusterExecutor.class);
+
+    private static final String DEFAULT_TOTAL_PROCESS_MEMORY = "1024m";
+
+    public ParamsInfo jobParamsInfo;
+
+    public AbstractClusterExecutor(ParamsInfo jobParamsInfo) {
+        this.jobParamsInfo = jobParamsInfo;
+    }
+
+    /**
+     * submit job
+     *
+     * @return
+     */
+    public abstract ResultInfo submitJob();
+
+    public YarnClient createYarnClient() throws IOException {
+        YarnClient yarnClient =
+                
YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromHadoopConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+        logger.info(
+                "yarn client successfully created, hadoop conf dir:{}",
+                jobParamsInfo.getHadoopConfDir());
+        return yarnClient;
+    }
+
+    /**
+     * kill yarn job and clean application files in hdfs.
+     *
+     * @return
+     */
+    public ResultInfo killJob() throws IOException {
+        String applicationId = jobParamsInfo.getApplicationId();
+        if (StringUtils.isEmpty(applicationId)) {
+            throw new NullPointerException("kill yarn job applicationId is 
required!");
+        }
+        logger.info("killed applicationId is:{}", applicationId);
+        YarnConfiguration yarnConfiguration =
+                YarnClusterDescriptorFactory.INSTANCE.parseYarnConfFromConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+
+        try (
+                YarnClient yarnClient =
+                        
YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromYarnConf(
+                                yarnConfiguration);) {
+            
yarnClient.killApplication(ConverterUtils.toApplicationId(applicationId));
+            logger.info("killed applicationId {} was unsuccessful.", 
applicationId);
+        } catch (YarnException e) {
+            logger.error("killed applicationId {0} was failed.", e);
+            return new ResultInfo("", "");
+        }
+
+        try (FileSystem fs = FileSystem.get(yarnConfiguration)) {
+            Path applicationDir =
+                    new Path(
+                            checkNotNull(fs.getHomeDirectory()),
+                            ".flink/" + checkNotNull(applicationId) + '/');
+            if (!fs.delete(applicationDir, true)) {
+                logger.error(
+                        "Deleting yarn application files under {} was 
unsuccessful.",
+                        applicationDir);
+            } else {
+                logger.error(
+                        "Deleting yarn application files under {} was 
successful.", applicationDir);
+            }
+        } catch (Exception e) {
+            logger.error("Deleting yarn application files was failed!", e);
+        }
+        return new ResultInfo("", "");
+    }
+
+    public ResultInfo cancelJob(boolean doSavepoint) {
+        String appId = jobParamsInfo.getApplicationId();
+        String jobId = jobParamsInfo.getFlinkJobId();
+
+        logger.info("cancel Job appId:{}, jobId:{}", appId, jobId);
+
+        ApplicationId applicationId = ConverterUtils.toApplicationId(appId);
+        JobID flinkJobId = new 
JobID(org.apache.flink.util.StringUtils.hexStringToByte(jobId));
+
+        Configuration flinkConfig = getFlinkConfigFromParamsInfo();
+        try (
+                YarnClusterDescriptor clusterDescriptor =
+                        (YarnClusterDescriptor) 
YarnClusterDescriptorFactory.INSTANCE.createClusterDescriptor(
+                                jobParamsInfo.getHadoopConfDir(), 
flinkConfig)) {
+
+            ClusterClientProvider<ApplicationId> retrieve =
+                    clusterDescriptor.retrieve(applicationId);
+            try (ClusterClient<ApplicationId> clusterClient = 
retrieve.getClusterClient()) {
+                if (doSavepoint) {
+                    CompletableFuture<String> savepointFuture =
+                            clusterClient.cancelWithSavepoint(flinkJobId, 
null, SavepointFormatType.DEFAULT);
+                    Object result = savepointFuture.get(2, TimeUnit.MINUTES);
+                    logger.info("flink job savepoint path: {}", 
result.toString());
+                } else {
+                    CompletableFuture<Acknowledge> cancelFuture = 
clusterClient.cancel(flinkJobId);
+                    Object result = cancelFuture.get(2, TimeUnit.MINUTES);
+                    logger.info("flink job cancel result: {}", 
result.toString());
+                }
+            } catch (Exception e) {
+                try {
+                    logger.error("cancel job error, will kill job:", e);
+                    clusterDescriptor.killCluster(applicationId);
+                } catch (FlinkException e1) {
+                    logger.error("yarn cluster Descriptor kill cluster 
error:", e);
+                    return new ResultInfo("", "");
+                }
+            }
+
+        } catch (Exception e) {
+            logger.error(String.format("cancel job failed,appId:{}, jobId:", 
appId, jobId), e);

Review Comment:
   ## Unused format argument
   
   This format call refers to 0 argument(s) but supplies 2 argument(s).
   
   [Show more 
details](https://github.com/apache/dolphinscheduler/security/code-scanning/1250)



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/executor/AbstractClusterExecutor.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOG4J_NAME;
+import static 
org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME;
+
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ParamsInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ResultInfo;
+import 
org.apache.dolphinscheduler.plugin.task.flink.factory.YarnClusterDescriptorFactory;
+import org.apache.dolphinscheduler.plugin.task.flink.utils.JobGraphBuildUtil;
+import org.apache.dolphinscheduler.spi.utils.Constants;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractClusterExecutor {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(AbstractClusterExecutor.class);
+
+    private static final String DEFAULT_TOTAL_PROCESS_MEMORY = "1024m";
+
+    public ParamsInfo jobParamsInfo;
+
+    public AbstractClusterExecutor(ParamsInfo jobParamsInfo) {
+        this.jobParamsInfo = jobParamsInfo;
+    }
+
+    /**
+     * submit job
+     *
+     * @return
+     */
+    public abstract ResultInfo submitJob();
+
+    public YarnClient createYarnClient() throws IOException {
+        YarnClient yarnClient =
+                
YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromHadoopConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+        logger.info(
+                "yarn client successfully created, hadoop conf dir:{}",
+                jobParamsInfo.getHadoopConfDir());
+        return yarnClient;
+    }
+
+    /**
+     * kill yarn job and clean application files in hdfs.
+     *
+     * @return
+     */
+    public ResultInfo killJob() throws IOException {
+        String applicationId = jobParamsInfo.getApplicationId();
+        if (StringUtils.isEmpty(applicationId)) {
+            throw new NullPointerException("kill yarn job applicationId is 
required!");
+        }
+        logger.info("killed applicationId is:{}", applicationId);
+        YarnConfiguration yarnConfiguration =
+                YarnClusterDescriptorFactory.INSTANCE.parseYarnConfFromConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+
+        try (
+                YarnClient yarnClient =
+                        
YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromYarnConf(
+                                yarnConfiguration);) {
+            
yarnClient.killApplication(ConverterUtils.toApplicationId(applicationId));

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [ConverterUtils.toApplicationId](1) should be avoided because it 
has been deprecated.
   
   [Show more 
details](https://github.com/apache/dolphinscheduler/security/code-scanning/1244)



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/executor/AbstractClusterExecutor.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOG4J_NAME;
+import static 
org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME;
+
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ParamsInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ResultInfo;
+import 
org.apache.dolphinscheduler.plugin.task.flink.factory.YarnClusterDescriptorFactory;
+import org.apache.dolphinscheduler.plugin.task.flink.utils.JobGraphBuildUtil;
+import org.apache.dolphinscheduler.spi.utils.Constants;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractClusterExecutor {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(AbstractClusterExecutor.class);
+
+    private static final String DEFAULT_TOTAL_PROCESS_MEMORY = "1024m";
+
+    public ParamsInfo jobParamsInfo;
+
+    public AbstractClusterExecutor(ParamsInfo jobParamsInfo) {
+        this.jobParamsInfo = jobParamsInfo;
+    }
+
+    /**
+     * submit job
+     *
+     * @return
+     */
+    public abstract ResultInfo submitJob();
+
+    public YarnClient createYarnClient() throws IOException {
+        YarnClient yarnClient =
+                
YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromHadoopConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+        logger.info(
+                "yarn client successfully created, hadoop conf dir:{}",
+                jobParamsInfo.getHadoopConfDir());
+        return yarnClient;
+    }
+
+    /**
+     * kill yarn job and clean application files in hdfs.
+     *
+     * @return
+     */
+    public ResultInfo killJob() throws IOException {
+        String applicationId = jobParamsInfo.getApplicationId();
+        if (StringUtils.isEmpty(applicationId)) {
+            throw new NullPointerException("kill yarn job applicationId is 
required!");
+        }
+        logger.info("killed applicationId is:{}", applicationId);
+        YarnConfiguration yarnConfiguration =
+                YarnClusterDescriptorFactory.INSTANCE.parseYarnConfFromConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+
+        try (
+                YarnClient yarnClient =
+                        
YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromYarnConf(
+                                yarnConfiguration);) {
+            
yarnClient.killApplication(ConverterUtils.toApplicationId(applicationId));
+            logger.info("killed applicationId {} was unsuccessful.", 
applicationId);
+        } catch (YarnException e) {
+            logger.error("killed applicationId {0} was failed.", e);
+            return new ResultInfo("", "");
+        }
+
+        try (FileSystem fs = FileSystem.get(yarnConfiguration)) {
+            Path applicationDir =
+                    new Path(
+                            checkNotNull(fs.getHomeDirectory()),
+                            ".flink/" + checkNotNull(applicationId) + '/');
+            if (!fs.delete(applicationDir, true)) {
+                logger.error(
+                        "Deleting yarn application files under {} was 
unsuccessful.",
+                        applicationDir);
+            } else {
+                logger.error(
+                        "Deleting yarn application files under {} was 
successful.", applicationDir);
+            }
+        } catch (Exception e) {
+            logger.error("Deleting yarn application files was failed!", e);
+        }
+        return new ResultInfo("", "");
+    }
+
+    public ResultInfo cancelJob(boolean doSavepoint) {
+        String appId = jobParamsInfo.getApplicationId();
+        String jobId = jobParamsInfo.getFlinkJobId();
+
+        logger.info("cancel Job appId:{}, jobId:{}", appId, jobId);
+
+        ApplicationId applicationId = ConverterUtils.toApplicationId(appId);
+        JobID flinkJobId = new 
JobID(org.apache.flink.util.StringUtils.hexStringToByte(jobId));
+
+        Configuration flinkConfig = getFlinkConfigFromParamsInfo();
+        try (
+                YarnClusterDescriptor clusterDescriptor =
+                        (YarnClusterDescriptor) 
YarnClusterDescriptorFactory.INSTANCE.createClusterDescriptor(
+                                jobParamsInfo.getHadoopConfDir(), 
flinkConfig)) {
+
+            ClusterClientProvider<ApplicationId> retrieve =
+                    clusterDescriptor.retrieve(applicationId);
+            try (ClusterClient<ApplicationId> clusterClient = 
retrieve.getClusterClient()) {
+                if (doSavepoint) {
+                    CompletableFuture<String> savepointFuture =
+                            clusterClient.cancelWithSavepoint(flinkJobId, 
null, SavepointFormatType.DEFAULT);
+                    Object result = savepointFuture.get(2, TimeUnit.MINUTES);
+                    logger.info("flink job savepoint path: {}", 
result.toString());
+                } else {
+                    CompletableFuture<Acknowledge> cancelFuture = 
clusterClient.cancel(flinkJobId);
+                    Object result = cancelFuture.get(2, TimeUnit.MINUTES);
+                    logger.info("flink job cancel result: {}", 
result.toString());
+                }
+            } catch (Exception e) {
+                try {
+                    logger.error("cancel job error, will kill job:", e);
+                    clusterDescriptor.killCluster(applicationId);
+                } catch (FlinkException e1) {
+                    logger.error("yarn cluster Descriptor kill cluster 
error:", e);
+                    return new ResultInfo("", "");
+                }
+            }
+
+        } catch (Exception e) {
+            logger.error(String.format("cancel job failed,appId:{}, jobId:", 
appId, jobId), e);
+            return new ResultInfo(appId, jobId);
+        }
+
+        return new ResultInfo(appId, jobId);
+    }
+
+    public ResultInfo savePoint() {
+        String appId = jobParamsInfo.getApplicationId();
+        String jobId = jobParamsInfo.getFlinkJobId();
+
+        logger.info("cancel Job appId:{}, jobId:{}", appId, jobId);
+
+        ApplicationId applicationId = ConverterUtils.toApplicationId(appId);

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [ConverterUtils.toApplicationId](1) should be avoided because it 
has been deprecated.
   
   [Show more 
details](https://github.com/apache/dolphinscheduler/security/code-scanning/1247)



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/executor/AbstractClusterExecutor.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOG4J_NAME;
+import static 
org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME;
+
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ParamsInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ResultInfo;
+import 
org.apache.dolphinscheduler.plugin.task.flink.factory.YarnClusterDescriptorFactory;
+import org.apache.dolphinscheduler.plugin.task.flink.utils.JobGraphBuildUtil;
+import org.apache.dolphinscheduler.spi.utils.Constants;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractClusterExecutor {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(AbstractClusterExecutor.class);
+
+    private static final String DEFAULT_TOTAL_PROCESS_MEMORY = "1024m";
+
+    public ParamsInfo jobParamsInfo;
+
+    public AbstractClusterExecutor(ParamsInfo jobParamsInfo) {
+        this.jobParamsInfo = jobParamsInfo;
+    }
+
+    /**
+     * submit job
+     *
+     * @return
+     */
+    public abstract ResultInfo submitJob();
+
+    public YarnClient createYarnClient() throws IOException {
+        YarnClient yarnClient =
+                
YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromHadoopConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+        logger.info(
+                "yarn client successfully created, hadoop conf dir:{}",
+                jobParamsInfo.getHadoopConfDir());
+        return yarnClient;
+    }
+
+    /**
+     * kill yarn job and clean application files in hdfs.
+     *
+     * @return
+     */
+    public ResultInfo killJob() throws IOException {
+        String applicationId = jobParamsInfo.getApplicationId();
+        if (StringUtils.isEmpty(applicationId)) {
+            throw new NullPointerException("kill yarn job applicationId is 
required!");
+        }
+        logger.info("killed applicationId is:{}", applicationId);
+        YarnConfiguration yarnConfiguration =
+                YarnClusterDescriptorFactory.INSTANCE.parseYarnConfFromConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+
+        try (
+                YarnClient yarnClient =
+                        
YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromYarnConf(
+                                yarnConfiguration);) {
+            
yarnClient.killApplication(ConverterUtils.toApplicationId(applicationId));
+            logger.info("killed applicationId {} was unsuccessful.", 
applicationId);
+        } catch (YarnException e) {
+            logger.error("killed applicationId {0} was failed.", e);
+            return new ResultInfo("", "");
+        }
+
+        try (FileSystem fs = FileSystem.get(yarnConfiguration)) {
+            Path applicationDir =
+                    new Path(
+                            checkNotNull(fs.getHomeDirectory()),
+                            ".flink/" + checkNotNull(applicationId) + '/');
+            if (!fs.delete(applicationDir, true)) {
+                logger.error(
+                        "Deleting yarn application files under {} was 
unsuccessful.",
+                        applicationDir);
+            } else {
+                logger.error(
+                        "Deleting yarn application files under {} was 
successful.", applicationDir);
+            }
+        } catch (Exception e) {
+            logger.error("Deleting yarn application files was failed!", e);
+        }
+        return new ResultInfo("", "");
+    }
+
+    public ResultInfo cancelJob(boolean doSavepoint) {
+        String appId = jobParamsInfo.getApplicationId();
+        String jobId = jobParamsInfo.getFlinkJobId();
+
+        logger.info("cancel Job appId:{}, jobId:{}", appId, jobId);
+
+        ApplicationId applicationId = ConverterUtils.toApplicationId(appId);
+        JobID flinkJobId = new 
JobID(org.apache.flink.util.StringUtils.hexStringToByte(jobId));
+
+        Configuration flinkConfig = getFlinkConfigFromParamsInfo();
+        try (
+                YarnClusterDescriptor clusterDescriptor =
+                        (YarnClusterDescriptor) 
YarnClusterDescriptorFactory.INSTANCE.createClusterDescriptor(
+                                jobParamsInfo.getHadoopConfDir(), 
flinkConfig)) {
+
+            ClusterClientProvider<ApplicationId> retrieve =
+                    clusterDescriptor.retrieve(applicationId);
+            try (ClusterClient<ApplicationId> clusterClient = 
retrieve.getClusterClient()) {
+                if (doSavepoint) {
+                    CompletableFuture<String> savepointFuture =
+                            clusterClient.cancelWithSavepoint(flinkJobId, 
null, SavepointFormatType.DEFAULT);
+                    Object result = savepointFuture.get(2, TimeUnit.MINUTES);
+                    logger.info("flink job savepoint path: {}", 
result.toString());
+                } else {
+                    CompletableFuture<Acknowledge> cancelFuture = 
clusterClient.cancel(flinkJobId);
+                    Object result = cancelFuture.get(2, TimeUnit.MINUTES);
+                    logger.info("flink job cancel result: {}", 
result.toString());
+                }
+            } catch (Exception e) {
+                try {
+                    logger.error("cancel job error, will kill job:", e);
+                    clusterDescriptor.killCluster(applicationId);
+                } catch (FlinkException e1) {
+                    logger.error("yarn cluster Descriptor kill cluster 
error:", e);
+                    return new ResultInfo("", "");
+                }
+            }
+
+        } catch (Exception e) {
+            logger.error(String.format("cancel job failed,appId:{}, jobId:", 
appId, jobId), e);
+            return new ResultInfo(appId, jobId);
+        }
+
+        return new ResultInfo(appId, jobId);
+    }
+
+    public ResultInfo savePoint() {
+        String appId = jobParamsInfo.getApplicationId();
+        String jobId = jobParamsInfo.getFlinkJobId();
+
+        logger.info("cancel Job appId:{}, jobId:{}", appId, jobId);
+
+        ApplicationId applicationId = ConverterUtils.toApplicationId(appId);
+        JobID flinkJobId = new 
JobID(org.apache.flink.util.StringUtils.hexStringToByte(jobId));
+
+        Configuration flinkConfig = getFlinkConfigFromParamsInfo();
+        try (
+                YarnClusterDescriptor clusterDescriptor =
+                        (YarnClusterDescriptor) 
YarnClusterDescriptorFactory.INSTANCE.createClusterDescriptor(
+                                jobParamsInfo.getHadoopConfDir(), 
flinkConfig)) {
+
+            ClusterClientProvider<ApplicationId> retrieve =
+                    clusterDescriptor.retrieve(applicationId);
+            try (ClusterClient<ApplicationId> clusterClient = 
retrieve.getClusterClient()) {
+                CompletableFuture<String> savepointFuture =
+                        clusterClient.triggerSavepoint(flinkJobId, null, 
SavepointFormatType.DEFAULT);
+                Object result = savepointFuture.get(2, TimeUnit.MINUTES);
+                logger.info("flink job savepoint path: {}", result.toString());
+            } catch (Exception e) {
+                logger.error("flink job savepoint error", e);
+            }
+
+        } catch (Exception e) {
+            logger.error(String.format("flink job savepoint failed, appId:{}, 
jobId:", appId, jobId), e);
+            return new ResultInfo(appId, jobId);
+        }
+
+        return new ResultInfo(appId, jobId);
+    }
+
+    protected Configuration getFlinkConfigFromParamsInfo() {
+        Configuration defaultGlobalConfig =
+                
JobGraphBuildUtil.getFlinkConfiguration(jobParamsInfo.getFlinkConfDir());
+        replaceDefaultGlobalConfig(defaultGlobalConfig, jobParamsInfo);
+        return defaultGlobalConfig;
+    }
+
+    /**
+     * replace the default configuration items in the flink-conf.yaml
+     *
+     * @param flinkConfig
+     * @param jobParamsInfo
+     */
+    protected void replaceDefaultGlobalConfig(Configuration flinkConfig, 
ParamsInfo jobParamsInfo) {
+        if (!StringUtils.isEmpty(jobParamsInfo.getName())) {
+            flinkConfig.setString(YarnConfigOptions.APPLICATION_NAME, 
jobParamsInfo.getName());
+        }
+
+        if (!StringUtils.isEmpty(jobParamsInfo.getQueue())) {
+            flinkConfig.setString(YarnConfigOptions.APPLICATION_QUEUE, 
jobParamsInfo.getQueue());
+        }
+
+        if (!StringUtils.isEmpty(jobParamsInfo.getFlinkConfDir())) {
+            discoverLogConfigFile(jobParamsInfo.getFlinkConfDir())
+                    .ifPresent(
+                            file -> flinkConfig.setString(
+                                    
YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE,
+                                    file.getPath()));
+        }
+
+        if (!flinkConfig.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY)) {
+            flinkConfig.setString(
+                    TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(), 
DEFAULT_TOTAL_PROCESS_MEMORY);
+        }
+
+        // fill security config
+        if (jobParamsInfo.isOpenSecurity()) {
+            flinkConfig.setString(
+                    SecurityOptions.KERBEROS_LOGIN_KEYTAB.key(),
+                    
PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH));
+            
Optional.ofNullable(PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME))
+                    .ifPresent(
+                            principal -> flinkConfig.setString(
+                                    
SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key(),
+                                    principal));
+        }
+
+        Properties flinkConfigProperties = jobParamsInfo.getConfProperties();
+        if (!Objects.isNull(flinkConfigProperties)) {
+            flinkConfigProperties.stringPropertyNames().stream()
+                    .forEach(
+                            key -> flinkConfig.setString(
+                                    key, 
flinkConfigProperties.getProperty(key)));
+        }
+    }
+
+    /**
+     * find log4 files from flink conf
+     *
+     * @param configurationDirectory
+     * @return
+     */
+    protected Optional<File> discoverLogConfigFile(final String 
configurationDirectory) {
+        Optional<File> logConfigFile = Optional.empty();
+
+        final File log4jFile =
+                new File(configurationDirectory + File.separator + 
CONFIG_FILE_LOG4J_NAME);
+        if (log4jFile.exists()) {
+            logConfigFile = Optional.of(log4jFile);
+        }
+
+        final File logbackFile =
+                new File(configurationDirectory + File.separator + 
CONFIG_FILE_LOGBACK_NAME);
+        if (logbackFile.exists()) {
+            if (logConfigFile.isPresent()) {
+                logger.warn(
+                        "The configuration directory ('"
+                                + configurationDirectory
+                                + "') already contains a logger4J config file."
+                                + "If you want to use logback, then please 
delete or rename the log configuration file.");

Review Comment:
   ## Missing space in string literal
   
   This string appears to be missing a space after 'file.'.
   
   [Show more 
details](https://github.com/apache/dolphinscheduler/security/code-scanning/1249)



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/utils/YarnLogHelper.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.utils;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.io.*;
+import java.util.Arrays;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * reference for yarn LogCLIHelpers
+ *
+ */
+public class YarnLogHelper {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(YarnLogHelper.class);
+
+    public static String printAllContainersLogsReturnFilePath(
+                                                              
YarnConfiguration configuration, String finishedJobLogDir,
+                                                              String 
applicationId) throws IOException {
+        Path remoteRootLogDir =
+                new Path(
+                        configuration.get(
+                                YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+                                
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+        ApplicationId appId = ConverterUtils.toApplicationId(applicationId);

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [ConverterUtils.toApplicationId](1) should be avoided because it 
has been deprecated.
   
   [Show more 
details](https://github.com/apache/dolphinscheduler/security/code-scanning/1248)



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/executor/AbstractClusterExecutor.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOG4J_NAME;
+import static 
org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME;
+
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ParamsInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ResultInfo;
+import 
org.apache.dolphinscheduler.plugin.task.flink.factory.YarnClusterDescriptorFactory;
+import org.apache.dolphinscheduler.plugin.task.flink.utils.JobGraphBuildUtil;
+import org.apache.dolphinscheduler.spi.utils.Constants;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractClusterExecutor {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(AbstractClusterExecutor.class);
+
+    private static final String DEFAULT_TOTAL_PROCESS_MEMORY = "1024m";
+
+    public ParamsInfo jobParamsInfo;
+
+    public AbstractClusterExecutor(ParamsInfo jobParamsInfo) {
+        this.jobParamsInfo = jobParamsInfo;
+    }
+
+    /**
+     * submit job
+     *
+     * @return
+     */
+    public abstract ResultInfo submitJob();
+
+    public YarnClient createYarnClient() throws IOException {
+        YarnClient yarnClient =
+                
YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromHadoopConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+        logger.info(
+                "yarn client successfully created, hadoop conf dir:{}",
+                jobParamsInfo.getHadoopConfDir());
+        return yarnClient;
+    }
+
+    /**
+     * kill yarn job and clean application files in hdfs.
+     *
+     * @return
+     */
+    public ResultInfo killJob() throws IOException {
+        String applicationId = jobParamsInfo.getApplicationId();
+        if (StringUtils.isEmpty(applicationId)) {
+            throw new NullPointerException("kill yarn job applicationId is 
required!");
+        }
+        logger.info("killed applicationId is:{}", applicationId);
+        YarnConfiguration yarnConfiguration =
+                YarnClusterDescriptorFactory.INSTANCE.parseYarnConfFromConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+
+        try (
+                YarnClient yarnClient =
+                        
YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromYarnConf(
+                                yarnConfiguration);) {
+            
yarnClient.killApplication(ConverterUtils.toApplicationId(applicationId));
+            logger.info("killed applicationId {} was unsuccessful.", 
applicationId);
+        } catch (YarnException e) {
+            logger.error("killed applicationId {0} was failed.", e);
+            return new ResultInfo("", "");
+        }
+
+        try (FileSystem fs = FileSystem.get(yarnConfiguration)) {
+            Path applicationDir =
+                    new Path(
+                            checkNotNull(fs.getHomeDirectory()),
+                            ".flink/" + checkNotNull(applicationId) + '/');
+            if (!fs.delete(applicationDir, true)) {
+                logger.error(
+                        "Deleting yarn application files under {} was 
unsuccessful.",
+                        applicationDir);
+            } else {
+                logger.error(
+                        "Deleting yarn application files under {} was 
successful.", applicationDir);
+            }
+        } catch (Exception e) {
+            logger.error("Deleting yarn application files was failed!", e);
+        }
+        return new ResultInfo("", "");
+    }
+
+    public ResultInfo cancelJob(boolean doSavepoint) {
+        String appId = jobParamsInfo.getApplicationId();
+        String jobId = jobParamsInfo.getFlinkJobId();
+
+        logger.info("cancel Job appId:{}, jobId:{}", appId, jobId);
+
+        ApplicationId applicationId = ConverterUtils.toApplicationId(appId);

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [ConverterUtils.toApplicationId](1) should be avoided because it 
has been deprecated.
   
   [Show more 
details](https://github.com/apache/dolphinscheduler/security/code-scanning/1246)



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java:
##########
@@ -43,6 +54,13 @@
      */
     private TaskExecutionContext taskExecutionContext;
 
+
+
+    private ResultInfo flinkStreamResultInfo;
+
+    protected final Logger logger =

Review Comment:
   ## Field masks field in super class
   
   This field shadows another field called [logger](1) in a superclass.
   
   [Show more 
details](https://github.com/apache/dolphinscheduler/security/code-scanning/1253)



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/utils/YarnLogHelper.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.utils;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.io.*;
+import java.util.Arrays;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * reference for yarn LogCLIHelpers
+ *
+ */
+public class YarnLogHelper {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(YarnLogHelper.class);
+
+    public static String printAllContainersLogsReturnFilePath(
+                                                              
YarnConfiguration configuration, String finishedJobLogDir,
+                                                              String 
applicationId) throws IOException {
+        Path remoteRootLogDir =
+                new Path(
+                        configuration.get(
+                                YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+                                
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+        ApplicationId appId = ConverterUtils.toApplicationId(applicationId);
+        // mkdir if not exist
+        FileUtils.forceMkdir(new File(finishedJobLogDir));
+        String logFilePath = finishedJobLogDir + "/" + applicationId + ".log";
+        logger.info("finished job log file is:{} ", logFilePath);
+        File localLogFile = new File(logFilePath);
+        if (localLogFile.exists() && localLogFile.isFile() && 
localLogFile.length() > 0) {
+            logger.info("yarn log exist in local, log file path is:{}", 
localLogFile);
+            return logFilePath;
+        }
+
+        String hadoopUser = 
UserGroupInformation.getCurrentUser().getShortUserName();
+        String logDirSuffix = 
LogAggregationUtils.getRemoteNodeLogDirSuffix(configuration);
+
+        logger.info("Current Hadoop/Kerberos user: {}", hadoopUser);
+        Path remoteAppLogDir =
+                LogAggregationUtils.getRemoteAppLogDir(
+                        remoteRootLogDir, appId, hadoopUser, logDirSuffix);
+
+        long logFileSize = getLogFileSize(configuration, 
remoteAppLogDir.toString());
+        Preconditions.checkArgument(logFileSize > 0, "log file size =0");
+
+        // hdfs log file exist and create file and print stream
+        FileUtils.touch(localLogFile);
+        FileOutputStream fileOutputStream = new FileOutputStream(logFilePath);
+
+        try (PrintStream printStream = new PrintStream(fileOutputStream, 
true)) {
+            RemoteIterator<FileStatus> nodeFiles = null;
+            try {
+                Path qualifiedLogDir =
+                        
FileContext.getFileContext(configuration).makeQualified(remoteAppLogDir);
+                nodeFiles =
+                        FileContext.getFileContext(qualifiedLogDir.toUri(), 
configuration)
+                                .listStatus(remoteAppLogDir);
+            } catch (FileNotFoundException fnf) {
+                logDirNotExist(remoteAppLogDir.toString(), printStream);
+            }
+
+            boolean foundAnyLogs = false;
+
+            while (nodeFiles.hasNext()) {

Review Comment:
   ## Dereferenced variable may be null
   
   Variable [nodeFiles](1) may be null here because of [this](2) assignment.
   
   [Show more 
details](https://github.com/apache/dolphinscheduler/security/code-scanning/1252)



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/enums/ClusterClient.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.enums;
+
+import org.apache.dolphinscheduler.plugin.task.flink.client.IClusterClient;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.CheckpointInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ParamsInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ResultInfo;
+import 
org.apache.dolphinscheduler.plugin.task.flink.executor.KerberosSecurityContext;
+import 
org.apache.dolphinscheduler.plugin.task.flink.executor.YarnApplicationClusterExecutor;
+import 
org.apache.dolphinscheduler.plugin.task.flink.executor.YarnPerJobClusterExecutor;
+import 
org.apache.dolphinscheduler.plugin.task.flink.factory.YarnClusterDescriptorFactory;
+import org.apache.dolphinscheduler.plugin.task.flink.utils.HdfsUtil;
+import org.apache.dolphinscheduler.plugin.task.flink.utils.YarnLogHelper;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.FunctionUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+public enum ClusterClient implements IClusterClient {
+
+    INSTANCE;
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ClusterClient.class);
+
+    private static Cache<String, YarnClient> YARN_CLIENT_CACHE =
+            CacheBuilder.newBuilder()
+                    .expireAfterWrite(12, TimeUnit.HOURS)
+                    .removalListener(new YarnClientRemovalListener())
+                    .build();
+
+    @Override
+    public ResultInfo submitFlinkJob(ParamsInfo jobParamsInfo) throws 
Exception {
+        FlinkStreamDeployMode mode =
+                jobParamsInfo.getRunMode() != null
+                        ? FlinkStreamDeployMode.YARN_PER_JOB
+                        : jobParamsInfo.getRunMode();
+        ResultInfo resultInfo;
+        switch (mode) {
+            case YARN_APPLICATION:
+                resultInfo = new 
YarnApplicationClusterExecutor(jobParamsInfo).submitJob();
+                break;
+            case YARN_PER_JOB:
+            default:
+                resultInfo = new 
YarnPerJobClusterExecutor(jobParamsInfo).submitJob();
+        }
+        return resultInfo;
+    }
+
+    @Override
+    public ResultInfo submitFlinkJobWithKerberos(ParamsInfo jobParamsInfo) 
throws Exception {
+        return KerberosSecurityContext.runSecured(
+                jobParamsInfo,
+                FunctionUtils.uncheckedSupplier(() -> 
submitFlinkJob(jobParamsInfo)));
+    }
+
+    @Override
+    public ResultInfo killYarnJob(ParamsInfo jobParamsInfo) throws IOException 
{
+        return new YarnPerJobClusterExecutor(jobParamsInfo).killJob();
+    }
+
+    @Override
+    public ResultInfo killYarnJobWithKerberos(ParamsInfo jobParamsInfo) throws 
Exception {
+        return KerberosSecurityContext.runSecured(
+                jobParamsInfo, FunctionUtils.uncheckedSupplier(() -> 
killYarnJob(jobParamsInfo)));
+    }
+
+    @Override
+    public YarnTaskStatus getYarnJobStatus(ParamsInfo jobParamsInfo) throws 
Exception {
+        String applicationId = jobParamsInfo.getApplicationId();
+        String hadoopConfDir = jobParamsInfo.getHadoopConfDir();
+        Preconditions.checkNotNull(applicationId, "yarn applicationId is not 
null!");
+        Preconditions.checkNotNull(hadoopConfDir, "hadoop conf dir is not 
null!");
+
+        YarnClient yarnClient =
+                YARN_CLIENT_CACHE.get(
+                        hadoopConfDir,
+                        () -> {
+                            try {
+                                logger.info("create yarn client,create 
time:{}", LocalDateTime.now());
+                                return new 
YarnPerJobClusterExecutor(jobParamsInfo)
+                                        .createYarnClient();
+                            } catch (IOException e) {
+                                logger.error("create yarn client error!", e);
+                            }
+                            return null;
+                        });
+
+        if (!Objects.isNull(yarnClient)) {
+            try {
+                ApplicationId appId = 
ConverterUtils.toApplicationId(applicationId);

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [ConverterUtils.toApplicationId](1) should be avoided because it 
has been deprecated.
   
   [Show more 
details](https://github.com/apache/dolphinscheduler/security/code-scanning/1245)



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/executor/AbstractClusterExecutor.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOG4J_NAME;
+import static 
org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME;
+
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ParamsInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ResultInfo;
+import 
org.apache.dolphinscheduler.plugin.task.flink.factory.YarnClusterDescriptorFactory;
+import org.apache.dolphinscheduler.plugin.task.flink.utils.JobGraphBuildUtil;
+import org.apache.dolphinscheduler.spi.utils.Constants;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractClusterExecutor {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(AbstractClusterExecutor.class);
+
+    private static final String DEFAULT_TOTAL_PROCESS_MEMORY = "1024m";
+
+    public ParamsInfo jobParamsInfo;
+
+    public AbstractClusterExecutor(ParamsInfo jobParamsInfo) {
+        this.jobParamsInfo = jobParamsInfo;
+    }
+
+    /**
+     * submit job
+     *
+     * @return
+     */
+    public abstract ResultInfo submitJob();
+
+    public YarnClient createYarnClient() throws IOException {
+        YarnClient yarnClient =
+                
YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromHadoopConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+        logger.info(
+                "yarn client successfully created, hadoop conf dir:{}",
+                jobParamsInfo.getHadoopConfDir());
+        return yarnClient;
+    }
+
+    /**
+     * kill yarn job and clean application files in hdfs.
+     *
+     * @return
+     */
+    public ResultInfo killJob() throws IOException {
+        String applicationId = jobParamsInfo.getApplicationId();
+        if (StringUtils.isEmpty(applicationId)) {
+            throw new NullPointerException("kill yarn job applicationId is 
required!");
+        }
+        logger.info("killed applicationId is:{}", applicationId);
+        YarnConfiguration yarnConfiguration =
+                YarnClusterDescriptorFactory.INSTANCE.parseYarnConfFromConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+
+        try (
+                YarnClient yarnClient =
+                        
YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromYarnConf(
+                                yarnConfiguration);) {
+            
yarnClient.killApplication(ConverterUtils.toApplicationId(applicationId));
+            logger.info("killed applicationId {} was unsuccessful.", 
applicationId);
+        } catch (YarnException e) {
+            logger.error("killed applicationId {0} was failed.", e);
+            return new ResultInfo("", "");
+        }
+
+        try (FileSystem fs = FileSystem.get(yarnConfiguration)) {
+            Path applicationDir =
+                    new Path(
+                            checkNotNull(fs.getHomeDirectory()),
+                            ".flink/" + checkNotNull(applicationId) + '/');
+            if (!fs.delete(applicationDir, true)) {
+                logger.error(
+                        "Deleting yarn application files under {} was 
unsuccessful.",
+                        applicationDir);
+            } else {
+                logger.error(
+                        "Deleting yarn application files under {} was 
successful.", applicationDir);
+            }
+        } catch (Exception e) {
+            logger.error("Deleting yarn application files was failed!", e);
+        }
+        return new ResultInfo("", "");
+    }
+
+    public ResultInfo cancelJob(boolean doSavepoint) {
+        String appId = jobParamsInfo.getApplicationId();
+        String jobId = jobParamsInfo.getFlinkJobId();
+
+        logger.info("cancel Job appId:{}, jobId:{}", appId, jobId);
+
+        ApplicationId applicationId = ConverterUtils.toApplicationId(appId);
+        JobID flinkJobId = new 
JobID(org.apache.flink.util.StringUtils.hexStringToByte(jobId));
+
+        Configuration flinkConfig = getFlinkConfigFromParamsInfo();
+        try (
+                YarnClusterDescriptor clusterDescriptor =
+                        (YarnClusterDescriptor) 
YarnClusterDescriptorFactory.INSTANCE.createClusterDescriptor(
+                                jobParamsInfo.getHadoopConfDir(), 
flinkConfig)) {
+
+            ClusterClientProvider<ApplicationId> retrieve =
+                    clusterDescriptor.retrieve(applicationId);
+            try (ClusterClient<ApplicationId> clusterClient = 
retrieve.getClusterClient()) {
+                if (doSavepoint) {
+                    CompletableFuture<String> savepointFuture =
+                            clusterClient.cancelWithSavepoint(flinkJobId, 
null, SavepointFormatType.DEFAULT);
+                    Object result = savepointFuture.get(2, TimeUnit.MINUTES);
+                    logger.info("flink job savepoint path: {}", 
result.toString());
+                } else {
+                    CompletableFuture<Acknowledge> cancelFuture = 
clusterClient.cancel(flinkJobId);
+                    Object result = cancelFuture.get(2, TimeUnit.MINUTES);
+                    logger.info("flink job cancel result: {}", 
result.toString());
+                }
+            } catch (Exception e) {
+                try {
+                    logger.error("cancel job error, will kill job:", e);
+                    clusterDescriptor.killCluster(applicationId);
+                } catch (FlinkException e1) {
+                    logger.error("yarn cluster Descriptor kill cluster 
error:", e);
+                    return new ResultInfo("", "");
+                }
+            }
+
+        } catch (Exception e) {
+            logger.error(String.format("cancel job failed,appId:{}, jobId:", 
appId, jobId), e);
+            return new ResultInfo(appId, jobId);
+        }
+
+        return new ResultInfo(appId, jobId);
+    }
+
+    public ResultInfo savePoint() {
+        String appId = jobParamsInfo.getApplicationId();
+        String jobId = jobParamsInfo.getFlinkJobId();
+
+        logger.info("cancel Job appId:{}, jobId:{}", appId, jobId);
+
+        ApplicationId applicationId = ConverterUtils.toApplicationId(appId);
+        JobID flinkJobId = new 
JobID(org.apache.flink.util.StringUtils.hexStringToByte(jobId));
+
+        Configuration flinkConfig = getFlinkConfigFromParamsInfo();
+        try (
+                YarnClusterDescriptor clusterDescriptor =
+                        (YarnClusterDescriptor) 
YarnClusterDescriptorFactory.INSTANCE.createClusterDescriptor(
+                                jobParamsInfo.getHadoopConfDir(), 
flinkConfig)) {
+
+            ClusterClientProvider<ApplicationId> retrieve =
+                    clusterDescriptor.retrieve(applicationId);
+            try (ClusterClient<ApplicationId> clusterClient = 
retrieve.getClusterClient()) {
+                CompletableFuture<String> savepointFuture =
+                        clusterClient.triggerSavepoint(flinkJobId, null, 
SavepointFormatType.DEFAULT);
+                Object result = savepointFuture.get(2, TimeUnit.MINUTES);
+                logger.info("flink job savepoint path: {}", result.toString());
+            } catch (Exception e) {
+                logger.error("flink job savepoint error", e);
+            }
+
+        } catch (Exception e) {
+            logger.error(String.format("flink job savepoint failed, appId:{}, 
jobId:", appId, jobId), e);

Review Comment:
   ## Unused format argument
   
   This format call refers to 0 argument(s) but supplies 2 argument(s).
   
   [Show more 
details](https://github.com/apache/dolphinscheduler/security/code-scanning/1251)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to