This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 25930c0ed [improve-2695][console-service] The flink/pipe/build 
interface is not standardized (#2705)
25930c0ed is described below

commit 25930c0ed87c8a7885f2427ea668ee552fe8481b
Author: 芝麻仗剑走天涯 <[email protected]>
AuthorDate: Wed May 10 12:52:03 2023 +0800

    [improve-2695][console-service] The flink/pipe/build interface is not 
standardized (#2705)
    
    * update build
    
    * format modification  build
    
    * build interface optimization
    
    * Determine whether to enable new build optimization
---
 .../ApplicationBuildPipelineController.java        | 50 +----------------
 .../console/core/service/AppBuildPipeService.java  | 13 +++--
 .../core/service/impl/AppBuildPipeServiceImpl.java | 62 +++++++++++++++++++++-
 3 files changed, 70 insertions(+), 55 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
index 52aecfcb5..b00c69fc7 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
@@ -19,14 +19,10 @@ package org.apache.streampark.console.core.controller;
 
 import org.apache.streampark.console.base.domain.ApiDocConstant;
 import org.apache.streampark.console.base.domain.RestResponse;
-import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.core.annotation.ApiAccess;
 import org.apache.streampark.console.core.annotation.PermissionAction;
 import org.apache.streampark.console.core.bean.AppBuildDockerResolvedDetail;
 import org.apache.streampark.console.core.entity.AppBuildPipeline;
-import org.apache.streampark.console.core.entity.Application;
-import org.apache.streampark.console.core.entity.ApplicationLog;
-import org.apache.streampark.console.core.entity.FlinkEnv;
 import org.apache.streampark.console.core.enums.PermissionType;
 import org.apache.streampark.console.core.service.AppBuildPipeService;
 import org.apache.streampark.console.core.service.ApplicationLogService;
@@ -50,7 +46,6 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
-import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -97,50 +92,7 @@ public class ApplicationBuildPipelineController {
   @RequiresPermissions("app:create")
   public RestResponse buildApplication(Long appId, boolean forceBuild) {
     try {
-      Application app = applicationService.getById(appId);
-
-      // 1) check flink version
-      FlinkEnv env = flinkEnvService.getById(app.getVersionId());
-      boolean checkVersion = env.getFlinkVersion().checkVersion(false);
-      if (!checkVersion) {
-        throw new ApiAlertException(
-            "Unsupported flink version: " + env.getFlinkVersion().version());
-      }
-
-      // 2) check env
-      boolean envOk = applicationService.checkEnv(app);
-      if (!envOk) {
-        throw new ApiAlertException(
-            "Check flink env failed, please check the flink version of this 
job");
-      }
-
-      if (!forceBuild && !appBuildPipeService.allowToBuildNow(appId)) {
-        throw new ApiAlertException(
-            "The job is invalid, or the job cannot be built while it is 
running");
-      }
-      // check if you need to go through the build process (if the jar and pom 
have changed,
-      // you need to go through the build process, if other common parameters 
are modified,
-      // you don't need to go through the build process)
-
-      ApplicationLog applicationLog = new ApplicationLog();
-      applicationLog.setOptionName(
-          
org.apache.streampark.console.core.enums.Operation.RELEASE.getValue());
-      applicationLog.setAppId(app.getId());
-      applicationLog.setOptionTime(new Date());
-
-      boolean needBuild = applicationService.checkBuildAndUpdate(app);
-      if (!needBuild) {
-        applicationLog.setSuccess(true);
-        applicationLogService.save(applicationLog);
-        return RestResponse.success(true);
-      }
-
-      // rollback
-      if (app.isNeedRollback() && app.isFlinkSqlJob()) {
-        flinkSqlService.rollback(app);
-      }
-
-      boolean actionResult = appBuildPipeService.buildApplication(app, 
applicationLog);
+      boolean actionResult = appBuildPipeService.buildApplication(appId, 
forceBuild);
       return RestResponse.success(actionResult);
     } catch (Exception e) {
       return RestResponse.success(false).message(e.getMessage());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/AppBuildPipeService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/AppBuildPipeService.java
index 48e92c60a..4b455b5c5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/AppBuildPipeService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/AppBuildPipeService.java
@@ -18,8 +18,6 @@
 package org.apache.streampark.console.core.service;
 
 import org.apache.streampark.console.core.entity.AppBuildPipeline;
-import org.apache.streampark.console.core.entity.Application;
-import org.apache.streampark.console.core.entity.ApplicationLog;
 import org.apache.streampark.flink.packer.pipeline.DockerResolvedSnapshot;
 import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
 
@@ -33,9 +31,14 @@ import java.util.Optional;
 
 public interface AppBuildPipeService extends IService<AppBuildPipeline> {
 
-  /** Build application. This is an async call method. */
-  boolean buildApplication(@Nonnull Application app, ApplicationLog 
applicationLog)
-      throws Exception;
+  /**
+   * Build application. This is an async call method.
+   *
+   * @param appId application id
+   * @param forceBuild forced start pipeline or not
+   * @return Whether the pipeline was successfully started
+   */
+  boolean buildApplication(@Nonnull Long appId, boolean forceBuild) throws 
Exception;
 
   /**
    * Get current build pipeline instance of specified application
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 45d4ae14d..af3a23cb2 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -96,6 +96,7 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -153,8 +154,39 @@ public class AppBuildPipeServiceImpl
   private static final Cache<Long, DockerPushSnapshot> 
DOCKER_PUSH_PG_SNAPSHOTS =
       Caffeine.newBuilder().expireAfterWrite(30, TimeUnit.DAYS).build();
 
+  /**
+   * Build application. This is an async call method.
+   *
+   * @param appId application id
+   * @param forceBuild forced start pipeline or not
+   * @return Whether the pipeline was successfully started
+   */
   @Override
-  public boolean buildApplication(@Nonnull Application app, ApplicationLog 
applicationLog) {
+  public boolean buildApplication(Long appId, boolean forceBuild) {
+    // check the build environment
+    checkBuildEnv(appId, forceBuild);
+
+    Application app = applicationService.getById(appId);
+    ApplicationLog applicationLog = new ApplicationLog();
+    applicationLog.setOptionName(
+        org.apache.streampark.console.core.enums.Operation.RELEASE.getValue());
+    applicationLog.setAppId(app.getId());
+    applicationLog.setOptionTime(new Date());
+
+    // check if you need to go through the build process (if the jar and pom 
have changed,
+    // you need to go through the build process, if other common parameters 
are modified,
+    // you don't need to go through the build process)
+    boolean needBuild = applicationService.checkBuildAndUpdate(app);
+    if (!needBuild) {
+      applicationLog.setSuccess(true);
+      applicationLogService.save(applicationLog);
+      return true;
+    }
+    // rollback
+    if (app.isNeedRollback() && app.isFlinkSqlJob()) {
+      flinkSqlService.rollback(app);
+    }
+
     // 1) flink sql setDependency
     FlinkSql newFlinkSql = flinkSqlService.getCandidate(app.getId(), 
CandidateType.NEW);
     FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(), 
false);
@@ -346,6 +378,34 @@ public class AppBuildPipeServiceImpl
     return saved;
   }
 
+  /**
+   * check the build environment
+   *
+   * @param appId application id
+   * @param forceBuild forced start pipeline or not
+   * @return
+   */
+  private void checkBuildEnv(Long appId, boolean forceBuild) {
+    Application app = applicationService.getById(appId);
+
+    // 1) check flink version
+    FlinkEnv env = flinkEnvService.getById(app.getVersionId());
+    boolean checkVersion = env.getFlinkVersion().checkVersion(false);
+    ApiAlertException.throwIfFalse(
+        checkVersion, "Unsupported flink version:" + 
env.getFlinkVersion().version());
+
+    // 2) check env
+    boolean envOk = applicationService.checkEnv(app);
+    ApiAlertException.throwIfFalse(
+        envOk, "Check flink env failed, please check the flink version of this 
job");
+
+    // 3) Whether the application can currently start a new building progress
+    if (!forceBuild && !allowToBuildNow(appId)) {
+      throw new ApiAlertException(
+          "The job is invalid, or the job cannot be built while it is 
running");
+    }
+  }
+
   /** create building pipeline instance */
   private BuildPipeline createPipelineInstance(@Nonnull Application app) {
     FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(app.getVersionId());

Reply via email to