caishunfeng commented on code in PR #11939:
URL: https://github.com/apache/dolphinscheduler/pull/11939#discussion_r973058138
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java:
##########
@@ -55,37 +67,110 @@ public void init() {
flinkParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
FlinkStreamParameters.class);
if (flinkParameters == null || !flinkParameters.checkParameters()) {
- throw new RuntimeException("flink task params is not valid");
+ throw new RuntimeException("flink stream task params is not
valid");
}
flinkParameters.setQueue(taskExecutionContext.getQueue());
setMainJarName();
+ }
- FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
+ @Override
+ public List<String> getApplicationIds() throws TaskException {
+ return Collections.emptyList();
}
- /**
- * create command
- *
- * @return command
- */
@Override
- protected String buildCommand() {
- // flink run/run-application [OPTIONS] <jar-file> <arguments>
- List<String> args =
FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, flinkParameters);
+ public void cancelApplication() throws TaskException {
+ String hadoopConfDir = System.getenv(HADOOP_CONF_DIR);
+
+ initResultInfo();
- String command = ParameterUtils
- .convertParameterPlaceholders(String.join(" ", args),
taskExecutionContext.getDefinedParams());
+ ParamsInfo jobParamsInfo = ParamsInfo.builder()
+ .hadoopConfDir(hadoopConfDir)
+ .applicationId(flinkStreamResultInfo.getAppId())
+ .flinkJobId(flinkStreamResultInfo.getJobId())
+ .build();
- logger.info("flink task command : {}", command);
- return command;
+ try {
+ ClusterClient clusterClient = ClusterClient.INSTANCE;
+ ResultInfo jobResult = clusterClient.cancelFlinkJob(jobParamsInfo);
+ setExitStatusCode(EXIT_CODE_KILL);
+ logger.info(
+ String.format(
+ "job cancel result, appId:%s, jobId:%s",
+ jobResult.getAppId(), jobResult.getJobId()));
+ } catch (Exception e) {
+ logger.error("cancel flink stream task failure", e);
+ throw new TaskException("cancel flink stream task error", e);
+ }
}
@Override
- protected void setMainJarName() {
- ResourceInfo mainJar = flinkParameters.getMainJar();
- String resourceName = getResourceNameOfMainJar(mainJar);
- mainJar.setRes(resourceName);
- flinkParameters.setMainJar(mainJar);
+ public void submitApplication() throws TaskException {
+ String flinkHome = System.getenv(FLINK_HOME);
+
+ if (flinkHome.isEmpty()) {
+ logger.error("Please make sure to set the FLINK_HOME environment
variable.");
+ }
+
+ String hadoopConfDir = System.getenv(HADOOP_CONF_DIR);
+
+ if (hadoopConfDir.isEmpty()) {
+ logger.error("Please make sure to set the HADOOP_CONF_DIR
environment variable.");
Review Comment:
Should throw taskException here?
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/entity/ParamsInfo.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.entity;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import
org.apache.dolphinscheduler.plugin.task.flink.enums.FlinkStreamDeployMode;
+
+import java.util.Properties;
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Builder
+@Data
+public class ParamsInfo {
Review Comment:
```suggestion
public class FlinkParamsInfo {
```
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/factory/YarnClusterDescriptorFactory.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.factory;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.function.FunctionUtils;
+import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import java.io.File;
+import java.io.IOException;
+
+public enum YarnClusterDescriptorFactory implements
AbstractClusterDescriptorFactory {
+
+ INSTANCE;
+
+ private static final String XML_FILE_EXTENSION = "xml";
+
+ @Override
+ public ClusterDescriptor createClusterDescriptor(
+ String hadoopConfDir,
Configuration flinkConfig) {
+ if (StringUtils.isNotBlank(hadoopConfDir)) {
+ try {
+ YarnConfiguration yarnConf =
parseYarnConfFromConfDir(hadoopConfDir);
+ YarnClient yarnClient = createYarnClientFromYarnConf(yarnConf);
+
+ YarnClusterDescriptor clusterDescriptor =
+ new YarnClusterDescriptor(
+ flinkConfig,
+ yarnConf,
+ yarnClient,
+
YarnClientYarnClusterInformationRetriever.create(yarnClient),
+ false);
+ return clusterDescriptor;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ throw new RuntimeException("yarn mode must set param of
'hadoopConfDir'");
+ }
+ }
Review Comment:
```suggestion
if (StringUtils.isBlank(hadoopConfDir)) {
throw new RuntimeException("yarn mode must set param of
'hadoopConfDir'");
}
try {
YarnConfiguration yarnConf =
parseYarnConfFromConfDir(hadoopConfDir);
YarnClient yarnClient = createYarnClientFromYarnConf(yarnConf);
YarnClusterDescriptor clusterDescriptor =
new YarnClusterDescriptor(
flinkConfig,
yarnConf,
yarnClient,
YarnClientYarnClusterInformationRetriever.create(yarnClient),
false);
return clusterDescriptor;
} catch (Exception e) {
throw new RuntimeException(e);
}
```
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java:
##########
@@ -17,116 +17,86 @@
package org.apache.dolphinscheduler.plugin.task.flink;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ParamsInfo;
+import
org.apache.dolphinscheduler.plugin.task.flink.enums.FlinkStreamDeployMode;
import org.junit.Assert;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.List;
+import java.util.Properties;
public class FlinkArgsUtilsTest {
- private String joinStringListWithSpace(List<String> stringList) {
Review Comment:
Maybe we should keep the old test for flink task, and add the new one for
flink-stream task, WDYT?
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/entity/ResultInfo.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.entity;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Builder
+@Data
+public class ResultInfo {
Review Comment:
```suggestion
public class FlinkResultInfo {
```
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/executor/AbstractClusterExecutor.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.executor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ParamsInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ResultInfo;
+import
org.apache.dolphinscheduler.plugin.task.flink.factory.YarnClusterDescriptorFactory;
+import org.apache.dolphinscheduler.plugin.task.flink.utils.JobGraphBuildUtil;
+import org.apache.dolphinscheduler.spi.utils.Constants;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static
org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOG4J_NAME;
+import static
org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME;
+
+public abstract class AbstractClusterExecutor {
+
+ private static final Logger logger =
LoggerFactory.getLogger(AbstractClusterExecutor.class);
+
+ private static final String DEFAULT_TOTAL_PROCESS_MEMORY = "1024m";
+
+ public ParamsInfo jobParamsInfo;
+
+ public AbstractClusterExecutor(ParamsInfo jobParamsInfo) {
+ this.jobParamsInfo = jobParamsInfo;
+ }
+
+ /**
+ * submit job
+ *
+ * @return
+ */
+ public abstract ResultInfo submitJob();
+
+ public YarnClient createYarnClient() throws IOException {
+ YarnClient yarnClient =
+
YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromHadoopConfDir(
+ jobParamsInfo.getHadoopConfDir());
+ logger.info(
+ "yarn client successfully created, hadoop conf dir:{}",
+ jobParamsInfo.getHadoopConfDir());
+ return yarnClient;
+ }
+
+ /**
+ * kill yarn job and clean application files in hdfs.
+ *
+ * @return
+ */
+ public ResultInfo killJob() throws IOException {
+ String applicationId = jobParamsInfo.getApplicationId();
+ if (StringUtils.isEmpty(applicationId)) {
+ throw new NullPointerException("kill yarn job applicationId is
required!");
+ }
+ logger.info("killed applicationId is:{}", applicationId);
+ YarnConfiguration yarnConfiguration =
+ YarnClusterDescriptorFactory.INSTANCE.parseYarnConfFromConfDir(
+ jobParamsInfo.getHadoopConfDir());
+
+ try (
+ YarnClient yarnClient =
+
YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromYarnConf(
+ yarnConfiguration);) {
+
yarnClient.killApplication(ApplicationId.fromString(applicationId));
+ logger.info("killed applicationId {} was unsuccessful.",
applicationId);
+ } catch (YarnException e) {
+ logger.error("killed applicationId {0} was failed.", e);
+ return new ResultInfo("", "");
+ }
+
+ try (FileSystem fs = FileSystem.get(yarnConfiguration)) {
+ Path applicationDir =
+ new Path(
+ checkNotNull(fs.getHomeDirectory()),
+ ".flink/" + checkNotNull(applicationId) + '/');
+ if (!fs.delete(applicationDir, true)) {
+ logger.error(
+ "Deleting yarn application files under {} was
unsuccessful.",
+ applicationDir);
+ } else {
+ logger.error(
Review Comment:
```suggestion
logger.info(
```
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/client/IClusterClient.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.client;
+
+import org.apache.dolphinscheduler.plugin.task.flink.entity.CheckpointInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ParamsInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ResultInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.enums.YarnTaskStatus;
+
+import java.util.List;
+public interface IClusterClient {
Review Comment:
Please add some comment for it.
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/executor/AbstractClusterExecutor.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.executor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ParamsInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ResultInfo;
+import
org.apache.dolphinscheduler.plugin.task.flink.factory.YarnClusterDescriptorFactory;
+import org.apache.dolphinscheduler.plugin.task.flink.utils.JobGraphBuildUtil;
+import org.apache.dolphinscheduler.spi.utils.Constants;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static
org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOG4J_NAME;
+import static
org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME;
+
+public abstract class AbstractClusterExecutor {
+
+ private static final Logger logger =
LoggerFactory.getLogger(AbstractClusterExecutor.class);
+
+ private static final String DEFAULT_TOTAL_PROCESS_MEMORY = "1024m";
+
+ public ParamsInfo jobParamsInfo;
+
+ public AbstractClusterExecutor(ParamsInfo jobParamsInfo) {
+ this.jobParamsInfo = jobParamsInfo;
+ }
+
+ /**
+ * submit job
+ *
+ * @return
+ */
+ public abstract ResultInfo submitJob();
+
+ public YarnClient createYarnClient() throws IOException {
+ YarnClient yarnClient =
+
YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromHadoopConfDir(
+ jobParamsInfo.getHadoopConfDir());
+ logger.info(
+ "yarn client successfully created, hadoop conf dir:{}",
+ jobParamsInfo.getHadoopConfDir());
+ return yarnClient;
+ }
+
+ /**
+ * kill yarn job and clean application files in hdfs.
+ *
+ * @return
+ */
+ public ResultInfo killJob() throws IOException {
+ String applicationId = jobParamsInfo.getApplicationId();
+ if (StringUtils.isEmpty(applicationId)) {
+ throw new NullPointerException("kill yarn job applicationId is
required!");
+ }
+ logger.info("killed applicationId is:{}", applicationId);
+ YarnConfiguration yarnConfiguration =
+ YarnClusterDescriptorFactory.INSTANCE.parseYarnConfFromConfDir(
+ jobParamsInfo.getHadoopConfDir());
+
+ try (
+ YarnClient yarnClient =
+
YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromYarnConf(
+ yarnConfiguration);) {
+
yarnClient.killApplication(ApplicationId.fromString(applicationId));
+ logger.info("killed applicationId {} was unsuccessful.",
applicationId);
+ } catch (YarnException e) {
+ logger.error("killed applicationId {0} was failed.", e);
+ return new ResultInfo("", "");
+ }
+
+ try (FileSystem fs = FileSystem.get(yarnConfiguration)) {
+ Path applicationDir =
+ new Path(
+ checkNotNull(fs.getHomeDirectory()),
+ ".flink/" + checkNotNull(applicationId) + '/');
+ if (!fs.delete(applicationDir, true)) {
+ logger.error(
+ "Deleting yarn application files under {} was
unsuccessful.",
+ applicationDir);
+ } else {
+ logger.error(
+ "Deleting yarn application files under {} was
successful.", applicationDir);
+ }
+ } catch (Exception e) {
+ logger.error("Deleting yarn application files was failed!", e);
+ }
+ return new ResultInfo("", "");
+ }
+
+ public ResultInfo cancelJob(boolean doSavepoint) {
+ String appId = jobParamsInfo.getApplicationId();
+ String jobId = jobParamsInfo.getFlinkJobId();
+
+ logger.info("cancel Job appId:{}, jobId:{}", appId, jobId);
+
+ ApplicationId applicationId = ApplicationId.fromString(appId);
+ JobID flinkJobId = new
JobID(org.apache.flink.util.StringUtils.hexStringToByte(jobId));
+
+ Configuration flinkConfig = getFlinkConfigFromParamsInfo();
+ try (
+ YarnClusterDescriptor clusterDescriptor =
+ (YarnClusterDescriptor)
YarnClusterDescriptorFactory.INSTANCE.createClusterDescriptor(
+ jobParamsInfo.getHadoopConfDir(),
flinkConfig)) {
+
+ ClusterClientProvider<ApplicationId> retrieve =
+ clusterDescriptor.retrieve(applicationId);
+ try (ClusterClient<ApplicationId> clusterClient =
retrieve.getClusterClient()) {
+ if (doSavepoint) {
+ CompletableFuture<String> savepointFuture =
+ clusterClient.cancelWithSavepoint(flinkJobId,
null, SavepointFormatType.DEFAULT);
+ Object result = savepointFuture.get(2, TimeUnit.MINUTES);
+ logger.info("flink job savepoint path: {}",
result.toString());
+ } else {
+ CompletableFuture<Acknowledge> cancelFuture =
clusterClient.cancel(flinkJobId);
+ Object result = cancelFuture.get(2, TimeUnit.MINUTES);
+ logger.info("flink job cancel result: {}",
result.toString());
+ }
+ } catch (Exception e) {
+ try {
+ logger.error("cancel job error, will kill job:", e);
+ clusterDescriptor.killCluster(applicationId);
+ } catch (FlinkException e1) {
+ logger.error("yarn cluster Descriptor kill cluster
error:", e);
+ return new ResultInfo("", "");
+ }
+ }
+
+ } catch (Exception e) {
+ logger.error("cancel job failed,appId:{}, jobId:{}, exception:{}",
appId, jobId, e);
+ return new ResultInfo(appId, jobId);
+ }
+
+ return new ResultInfo(appId, jobId);
+ }
+
+ public ResultInfo savePoint() {
+ String appId = jobParamsInfo.getApplicationId();
+ String jobId = jobParamsInfo.getFlinkJobId();
+
+ logger.info("cancel Job appId:{}, jobId:{}", appId, jobId);
+
+ ApplicationId applicationId = ApplicationId.fromString(appId);
+ JobID flinkJobId = new
JobID(org.apache.flink.util.StringUtils.hexStringToByte(jobId));
+
+ Configuration flinkConfig = getFlinkConfigFromParamsInfo();
+ try (
+ YarnClusterDescriptor clusterDescriptor =
+ (YarnClusterDescriptor)
YarnClusterDescriptorFactory.INSTANCE.createClusterDescriptor(
+ jobParamsInfo.getHadoopConfDir(),
flinkConfig)) {
+
+ ClusterClientProvider<ApplicationId> retrieve =
+ clusterDescriptor.retrieve(applicationId);
+ try (ClusterClient<ApplicationId> clusterClient =
retrieve.getClusterClient()) {
+ CompletableFuture<String> savepointFuture =
+ clusterClient.triggerSavepoint(flinkJobId, null,
SavepointFormatType.DEFAULT);
+ Object result = savepointFuture.get(2, TimeUnit.MINUTES);
+ logger.info("flink job savepoint path: {}", result.toString());
+ } catch (Exception e) {
+ logger.error("flink job savepoint error", e);
+ }
+
+ } catch (Exception e) {
+ logger.error("flink job savepoint failed, appId:{}, jobId:{},
exception:{}", appId, jobId, e);
+ return new ResultInfo(appId, jobId);
Review Comment:
```suggestion
throw new TaskException("flink job savepoint failed", e)
```
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/executor/AbstractClusterExecutor.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.executor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ParamsInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ResultInfo;
+import
org.apache.dolphinscheduler.plugin.task.flink.factory.YarnClusterDescriptorFactory;
+import org.apache.dolphinscheduler.plugin.task.flink.utils.JobGraphBuildUtil;
+import org.apache.dolphinscheduler.spi.utils.Constants;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static
org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOG4J_NAME;
+import static
org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME;
+
+public abstract class AbstractClusterExecutor {
+
+ private static final Logger logger =
LoggerFactory.getLogger(AbstractClusterExecutor.class);
+
+ private static final String DEFAULT_TOTAL_PROCESS_MEMORY = "1024m";
+
+ public ParamsInfo jobParamsInfo;
+
+ public AbstractClusterExecutor(ParamsInfo jobParamsInfo) {
+ this.jobParamsInfo = jobParamsInfo;
+ }
+
+ /**
+ * submit job
+ *
+ * @return
+ */
+ public abstract ResultInfo submitJob();
+
+ public YarnClient createYarnClient() throws IOException {
+ YarnClient yarnClient =
+
YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromHadoopConfDir(
+ jobParamsInfo.getHadoopConfDir());
+ logger.info(
+ "yarn client successfully created, hadoop conf dir:{}",
+ jobParamsInfo.getHadoopConfDir());
+ return yarnClient;
+ }
+
+ /**
+ * kill yarn job and clean application files in hdfs.
+ *
+ * @return
+ */
+ public ResultInfo killJob() throws IOException {
+ String applicationId = jobParamsInfo.getApplicationId();
+ if (StringUtils.isEmpty(applicationId)) {
+ throw new NullPointerException("kill yarn job applicationId is
required!");
+ }
+ logger.info("killed applicationId is:{}", applicationId);
+ YarnConfiguration yarnConfiguration =
+ YarnClusterDescriptorFactory.INSTANCE.parseYarnConfFromConfDir(
+ jobParamsInfo.getHadoopConfDir());
+
+ try (
+ YarnClient yarnClient =
+
YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromYarnConf(
+ yarnConfiguration);) {
+
yarnClient.killApplication(ApplicationId.fromString(applicationId));
+ logger.info("killed applicationId {} was unsuccessful.",
applicationId);
+ } catch (YarnException e) {
+ logger.error("killed applicationId {0} was failed.", e);
+ return new ResultInfo("", "");
+ }
+
+ try (FileSystem fs = FileSystem.get(yarnConfiguration)) {
+ Path applicationDir =
+ new Path(
+ checkNotNull(fs.getHomeDirectory()),
+ ".flink/" + checkNotNull(applicationId) + '/');
+ if (!fs.delete(applicationDir, true)) {
+ logger.error(
+ "Deleting yarn application files under {} was
unsuccessful.",
+ applicationDir);
+ } else {
+ logger.error(
+ "Deleting yarn application files under {} was
successful.", applicationDir);
+ }
+ } catch (Exception e) {
+ logger.error("Deleting yarn application files was failed!", e);
+ }
+ return new ResultInfo("", "");
+ }
+
+ public ResultInfo cancelJob(boolean doSavepoint) {
+ String appId = jobParamsInfo.getApplicationId();
+ String jobId = jobParamsInfo.getFlinkJobId();
+
+ logger.info("cancel Job appId:{}, jobId:{}", appId, jobId);
+
+ ApplicationId applicationId = ApplicationId.fromString(appId);
+ JobID flinkJobId = new
JobID(org.apache.flink.util.StringUtils.hexStringToByte(jobId));
+
+ Configuration flinkConfig = getFlinkConfigFromParamsInfo();
+ try (
+ YarnClusterDescriptor clusterDescriptor =
+ (YarnClusterDescriptor)
YarnClusterDescriptorFactory.INSTANCE.createClusterDescriptor(
+ jobParamsInfo.getHadoopConfDir(),
flinkConfig)) {
+
+ ClusterClientProvider<ApplicationId> retrieve =
+ clusterDescriptor.retrieve(applicationId);
+ try (ClusterClient<ApplicationId> clusterClient =
retrieve.getClusterClient()) {
+ if (doSavepoint) {
+ CompletableFuture<String> savepointFuture =
+ clusterClient.cancelWithSavepoint(flinkJobId,
null, SavepointFormatType.DEFAULT);
+ Object result = savepointFuture.get(2, TimeUnit.MINUTES);
+ logger.info("flink job savepoint path: {}",
result.toString());
+ } else {
+ CompletableFuture<Acknowledge> cancelFuture =
clusterClient.cancel(flinkJobId);
+ Object result = cancelFuture.get(2, TimeUnit.MINUTES);
+ logger.info("flink job cancel result: {}",
result.toString());
+ }
+ } catch (Exception e) {
+ try {
+ logger.error("cancel job error, will kill job:", e);
+ clusterDescriptor.killCluster(applicationId);
+ } catch (FlinkException e1) {
+ logger.error("yarn cluster Descriptor kill cluster
error:", e);
+ return new ResultInfo("", "");
+ }
+ }
+
+ } catch (Exception e) {
+ logger.error("cancel job failed,appId:{}, jobId:{}, exception:{}",
appId, jobId, e);
+ return new ResultInfo(appId, jobId);
+ }
+
+ return new ResultInfo(appId, jobId);
+ }
+
+ public ResultInfo savePoint() {
+ String appId = jobParamsInfo.getApplicationId();
+ String jobId = jobParamsInfo.getFlinkJobId();
+
+ logger.info("cancel Job appId:{}, jobId:{}", appId, jobId);
+
+ ApplicationId applicationId = ApplicationId.fromString(appId);
+ JobID flinkJobId = new
JobID(org.apache.flink.util.StringUtils.hexStringToByte(jobId));
+
+ Configuration flinkConfig = getFlinkConfigFromParamsInfo();
+ try (
+ YarnClusterDescriptor clusterDescriptor =
+ (YarnClusterDescriptor)
YarnClusterDescriptorFactory.INSTANCE.createClusterDescriptor(
+ jobParamsInfo.getHadoopConfDir(),
flinkConfig)) {
+
+ ClusterClientProvider<ApplicationId> retrieve =
+ clusterDescriptor.retrieve(applicationId);
+ try (ClusterClient<ApplicationId> clusterClient =
retrieve.getClusterClient()) {
+ CompletableFuture<String> savepointFuture =
+ clusterClient.triggerSavepoint(flinkJobId, null,
SavepointFormatType.DEFAULT);
+ Object result = savepointFuture.get(2, TimeUnit.MINUTES);
+ logger.info("flink job savepoint path: {}", result.toString());
+ } catch (Exception e) {
+ logger.error("flink job savepoint error", e);
Review Comment:
throw exception
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]