This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch k8s-deploy
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/k8s-deploy by this push:
     new 1a4c585ab [Improve] deploy yarn session cluster improvement
1a4c585ab is described below

commit 1a4c585abf2840ffdd6e9d49d432b2a87050f092
Author: benjobs <[email protected]>
AuthorDate: Sun Jan 14 17:04:24 2024 +0800

    [Improve] deploy yarn session cluster improvement
---
 .../console/core/service/ApplicationService.java        |  4 ++++
 .../core/service/impl/ApplicationServiceImpl.java       | 11 ++++++-----
 .../core/service/impl/FlinkClusterServiceImpl.java      | 11 ++++++++++-
 .../streampark/flink/client/bean/DeployRequest.scala    |  8 ++++++--
 .../flink/client/impl/YarnSessionClient.scala           | 17 +++++++++++++----
 5 files changed, 39 insertions(+), 12 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index 5ffa7a02e..42d30fec5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -23,6 +23,8 @@ import 
org.apache.streampark.console.base.exception.ApplicationException;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.enums.AppExistsState;
 
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.service.IService;
 import org.springframework.web.multipart.MultipartFile;
@@ -124,4 +126,6 @@ public interface ApplicationService extends 
IService<Application> {
   String k8sStartLog(Long id, Integer offset, Integer limit) throws Exception;
 
   AppExistsState checkStart(Application app);
+
+  List<ApplicationReport> getYARNApplication(String appName);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 5ed782953..61bd08a90 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -656,7 +656,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       return AppExistsState.INVALID;
     }
     if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
-      boolean exists = 
!getApplicationReports(application.getJobName()).isEmpty();
+      boolean exists = !getYARNApplication(application.getJobName()).isEmpty();
       return exists ? AppExistsState.IN_YARN : AppExistsState.NO;
     }
     // todo on k8s check...
@@ -1451,7 +1451,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     // check job on yarn is already running
     if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
       ApiAlertException.throwIfTrue(
-          !getApplicationReports(application.getJobName()).isEmpty(),
+          !getYARNApplication(application.getJobName()).isEmpty(),
           "The same job name is already running in the yarn queue");
     }
 
@@ -1763,7 +1763,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     // kill application
     if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
       try {
-        List<ApplicationReport> applications = 
getApplicationReports(application.getJobName());
+        List<ApplicationReport> applications = 
getYARNApplication(application.getJobName());
         if (!applications.isEmpty()) {
           YarnClient yarnClient = HadoopUtils.yarnClient();
           yarnClient.killApplication(applications.get(0).getApplicationId());
@@ -1845,7 +1845,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         || yarnQueueService.isDefaultQueue(application.getYarnQueue());
   }
 
-  private List<ApplicationReport> getApplicationReports(String jobName) {
+  @Override
+  public List<ApplicationReport> getYARNApplication(String appName) {
     try {
       YarnClient yarnClient = HadoopUtils.yarnClient();
       Set<String> types =
@@ -1861,7 +1862,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       Set<String> yarnTag = Sets.newHashSet("streampark");
       List<ApplicationReport> applications = yarnClient.getApplications(types, 
states, yarnTag);
       return applications.stream()
-          .filter(report -> report.getName().equals(jobName))
+          .filter(report -> report.getName().equals(appName))
           .collect(Collectors.toList());
     } catch (Exception e) {
       throw new RuntimeException("The yarn api is abnormal. Ensure that yarn 
is running properly.");
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 84e10e45a..f1e68bdbc 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -151,10 +151,17 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
   @Transactional(rollbackFor = {Exception.class})
   public void start(Long id) {
     FlinkCluster flinkCluster = getById(id);
+    ApiAlertException.throwIfTrue(
+        
!applicationService.getYARNApplication(flinkCluster.getClusterName()).isEmpty(),
+        "The same job name: "
+            + flinkCluster.getClusterName()
+            + " is already running in the yarn queue");
+
     try {
       ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
       DeployRequest deployRequest = getDeployRequest(flinkCluster);
       log.info("deploy cluster request: " + deployRequest);
+
       Future<DeployResponse> future =
           executorService.submit(() -> FlinkClient.deploy(deployRequest));
       DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS);
@@ -193,13 +200,15 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
             flinkEnv.getFlinkVersion(),
             executionModeEnum,
             flinkCluster.getProperties(),
-            flinkCluster.getClusterId());
+            flinkCluster.getClusterId(),
+            flinkCluster.getClusterName());
       case KUBERNETES_NATIVE_SESSION:
         return KubernetesDeployRequest.apply(
             flinkEnv.getFlinkVersion(),
             executionModeEnum,
             flinkCluster.getProperties(),
             flinkCluster.getClusterId(),
+            flinkCluster.getClusterName(),
             flinkCluster.getK8sNamespace(),
             flinkCluster.getK8sConf(),
             flinkCluster.getServiceAccount(),
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
index 81b36a17b..936af108e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
@@ -31,7 +31,8 @@ case class DeployRequest(
     flinkVersion: FlinkVersion,
     executionMode: ExecutionMode,
     properties: JavaMap[String, Any],
-    clusterId: String) {
+    clusterId: String,
+    clusterName: String) {
 
   private[client] lazy val hdfsWorkspace = {
 
@@ -65,12 +66,13 @@ class KubernetesDeployRequest(
     override val executionMode: ExecutionMode,
     override val properties: JavaMap[String, Any],
     override val clusterId: String,
+    override val clusterName: String,
     val kubernetesNamespace: String = 
KubernetesConfigOptions.NAMESPACE.defaultValue(),
     val kubeConf: String = "~/.kube/config",
     val serviceAccount: String = 
KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT.defaultValue(),
     val flinkImage: String = 
KubernetesConfigOptions.CONTAINER_IMAGE.defaultValue(),
     val flinkRestExposedType: FlinkK8sRestExposedType = 
FlinkK8sRestExposedType.CLUSTER_IP)
-  extends DeployRequest(flinkVersion, executionMode, properties, clusterId)
+  extends DeployRequest(flinkVersion, executionMode, properties, clusterId, 
clusterName)
 
 object KubernetesDeployRequest {
   def apply(
@@ -78,6 +80,7 @@ object KubernetesDeployRequest {
       executionMode: ExecutionMode,
       properties: JavaMap[String, Any],
       clusterId: String,
+      clusterName: String,
       kubernetesNamespace: String,
       kubeConf: String,
       serviceAccount: String,
@@ -88,6 +91,7 @@ object KubernetesDeployRequest {
       executionMode,
       properties,
       clusterId,
+      clusterName,
       kubernetesNamespace,
       kubeConf,
       serviceAccount,
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index 8fa9922ad..328806c68 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -21,6 +21,7 @@ import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.client.`trait`.YarnClientTrait
 import org.apache.streampark.flink.client.bean._
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
 import org.apache.flink.client.program.{ClusterClient, PackagedProgram}
 import org.apache.flink.configuration._
@@ -58,7 +59,10 @@ object YarnSessionClient extends YarnClientTrait {
    * @param deployRequest
    * @param flinkConfig
    */
-  def deployClusterConfig(deployRequest: DeployRequest, flinkConfig: 
Configuration): Unit = {
+  private def deployClusterConfig(
+      deployRequest: DeployRequest,
+      flinkConfig: Configuration): Unit = {
+
     val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
       deployRequest.flinkVersion.flinkHome)
     val currentUser = UserGroupInformation.getCurrentUser
@@ -86,6 +90,10 @@ object YarnSessionClient extends YarnClientTrait {
       .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
       // conf dir
       .safeSet(DeploymentOptionsInternal.CONF_DIR, 
s"${deployRequest.flinkVersion.flinkHome}/conf")
+      // app tags
+      .safeSet(YarnConfigOptions.APPLICATION_TAGS, "streampark")
+      // app name
+      .safeSet(YarnConfigOptions.APPLICATION_NAME, deployRequest.clusterName)
 
     logInfo(s"""
                
|------------------------------------------------------------------
@@ -164,10 +172,12 @@ object YarnSessionClient extends YarnClientTrait {
     try {
       val flinkConfig =
         extractConfiguration(deployRequest.flinkVersion.flinkHome, 
deployRequest.properties)
+
       deployClusterConfig(deployRequest, flinkConfig)
+
       val yarnClusterDescriptor = 
getSessionClusterDeployDescriptor(flinkConfig)
       clusterDescriptor = yarnClusterDescriptor._2
-      if (null != deployRequest.clusterId && deployRequest.clusterId.nonEmpty) 
{
+      if (StringUtils.isNotBlank(deployRequest.clusterId)) {
         try {
           val applicationStatus =
             clusterDescriptor.getYarnClient
@@ -183,8 +193,7 @@ object YarnSessionClient extends YarnClientTrait {
             }
           }
         } catch {
-          case _: ApplicationNotFoundException =>
-            logInfo("this applicationId have not managed by yarn ,need deploy 
...")
+          case e: ApplicationNotFoundException => return DeployResponse(null, 
null, e)
         }
       }
       val clientProvider = 
clusterDescriptor.deploySessionCluster(yarnClusterDescriptor._1)

Reply via email to