This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 25930c0ed [improve-2695][console-service] The flink/pipe/build
interface is not standardized (#2705)
25930c0ed is described below
commit 25930c0ed87c8a7885f2427ea668ee552fe8481b
Author: 芝麻仗剑走天涯 <[email protected]>
AuthorDate: Wed May 10 12:52:03 2023 +0800
[improve-2695][console-service] The flink/pipe/build interface is not
standardized (#2705)
* update build
* format modification build
* build interface optimization
* Determine whether to enable new build optimization
---
.../ApplicationBuildPipelineController.java | 50 +----------------
.../console/core/service/AppBuildPipeService.java | 13 +++--
.../core/service/impl/AppBuildPipeServiceImpl.java | 62 +++++++++++++++++++++-
3 files changed, 70 insertions(+), 55 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
index 52aecfcb5..b00c69fc7 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
@@ -19,14 +19,10 @@ package org.apache.streampark.console.core.controller;
import org.apache.streampark.console.base.domain.ApiDocConstant;
import org.apache.streampark.console.base.domain.RestResponse;
-import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.core.annotation.ApiAccess;
import org.apache.streampark.console.core.annotation.PermissionAction;
import org.apache.streampark.console.core.bean.AppBuildDockerResolvedDetail;
import org.apache.streampark.console.core.entity.AppBuildPipeline;
-import org.apache.streampark.console.core.entity.Application;
-import org.apache.streampark.console.core.entity.ApplicationLog;
-import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.enums.PermissionType;
import org.apache.streampark.console.core.service.AppBuildPipeService;
import org.apache.streampark.console.core.service.ApplicationLogService;
@@ -50,7 +46,6 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
-import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -97,50 +92,7 @@ public class ApplicationBuildPipelineController {
@RequiresPermissions("app:create")
public RestResponse buildApplication(Long appId, boolean forceBuild) {
try {
- Application app = applicationService.getById(appId);
-
- // 1) check flink version
- FlinkEnv env = flinkEnvService.getById(app.getVersionId());
- boolean checkVersion = env.getFlinkVersion().checkVersion(false);
- if (!checkVersion) {
- throw new ApiAlertException(
- "Unsupported flink version: " + env.getFlinkVersion().version());
- }
-
- // 2) check env
- boolean envOk = applicationService.checkEnv(app);
- if (!envOk) {
- throw new ApiAlertException(
- "Check flink env failed, please check the flink version of this
job");
- }
-
- if (!forceBuild && !appBuildPipeService.allowToBuildNow(appId)) {
- throw new ApiAlertException(
- "The job is invalid, or the job cannot be built while it is
running");
- }
- // check if you need to go through the build process (if the jar and pom
have changed,
- // you need to go through the build process, if other common parameters
are modified,
- // you don't need to go through the build process)
-
- ApplicationLog applicationLog = new ApplicationLog();
- applicationLog.setOptionName(
-
org.apache.streampark.console.core.enums.Operation.RELEASE.getValue());
- applicationLog.setAppId(app.getId());
- applicationLog.setOptionTime(new Date());
-
- boolean needBuild = applicationService.checkBuildAndUpdate(app);
- if (!needBuild) {
- applicationLog.setSuccess(true);
- applicationLogService.save(applicationLog);
- return RestResponse.success(true);
- }
-
- // rollback
- if (app.isNeedRollback() && app.isFlinkSqlJob()) {
- flinkSqlService.rollback(app);
- }
-
- boolean actionResult = appBuildPipeService.buildApplication(app,
applicationLog);
+ boolean actionResult = appBuildPipeService.buildApplication(appId,
forceBuild);
return RestResponse.success(actionResult);
} catch (Exception e) {
return RestResponse.success(false).message(e.getMessage());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/AppBuildPipeService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/AppBuildPipeService.java
index 48e92c60a..4b455b5c5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/AppBuildPipeService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/AppBuildPipeService.java
@@ -18,8 +18,6 @@
package org.apache.streampark.console.core.service;
import org.apache.streampark.console.core.entity.AppBuildPipeline;
-import org.apache.streampark.console.core.entity.Application;
-import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.flink.packer.pipeline.DockerResolvedSnapshot;
import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
@@ -33,9 +31,14 @@ import java.util.Optional;
public interface AppBuildPipeService extends IService<AppBuildPipeline> {
- /** Build application. This is an async call method. */
- boolean buildApplication(@Nonnull Application app, ApplicationLog
applicationLog)
- throws Exception;
+ /**
+ * Build application. This is an async call method.
+ *
+ * @param appId application id
+ * @param forceBuild forced start pipeline or not
+ * @return Whether the pipeline was successfully started
+ */
+ boolean buildApplication(@Nonnull Long appId, boolean forceBuild) throws
Exception;
/**
* Get current build pipeline instance of specified application
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 45d4ae14d..af3a23cb2 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -96,6 +96,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -153,8 +154,39 @@ public class AppBuildPipeServiceImpl
private static final Cache<Long, DockerPushSnapshot>
DOCKER_PUSH_PG_SNAPSHOTS =
Caffeine.newBuilder().expireAfterWrite(30, TimeUnit.DAYS).build();
+ /**
+ * Build application. This is an async call method.
+ *
+ * @param appId application id
+ * @param forceBuild forced start pipeline or not
+ * @return Whether the pipeline was successfully started
+ */
@Override
- public boolean buildApplication(@Nonnull Application app, ApplicationLog
applicationLog) {
+ public boolean buildApplication(Long appId, boolean forceBuild) {
+ // check the build environment
+ checkBuildEnv(appId, forceBuild);
+
+ Application app = applicationService.getById(appId);
+ ApplicationLog applicationLog = new ApplicationLog();
+ applicationLog.setOptionName(
+ org.apache.streampark.console.core.enums.Operation.RELEASE.getValue());
+ applicationLog.setAppId(app.getId());
+ applicationLog.setOptionTime(new Date());
+
+ // check if you need to go through the build process (if the jar and pom
have changed,
+ // you need to go through the build process, if other common parameters
are modified,
+ // you don't need to go through the build process)
+ boolean needBuild = applicationService.checkBuildAndUpdate(app);
+ if (!needBuild) {
+ applicationLog.setSuccess(true);
+ applicationLogService.save(applicationLog);
+ return true;
+ }
+ // rollback
+ if (app.isNeedRollback() && app.isFlinkSqlJob()) {
+ flinkSqlService.rollback(app);
+ }
+
// 1) flink sql setDependency
FlinkSql newFlinkSql = flinkSqlService.getCandidate(app.getId(),
CandidateType.NEW);
FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(),
false);
@@ -346,6 +378,34 @@ public class AppBuildPipeServiceImpl
return saved;
}
+ /**
+ * check the build environment
+ *
+ * @param appId application id
+ * @param forceBuild forced start pipeline or not
+ * @return
+ */
+ private void checkBuildEnv(Long appId, boolean forceBuild) {
+ Application app = applicationService.getById(appId);
+
+ // 1) check flink version
+ FlinkEnv env = flinkEnvService.getById(app.getVersionId());
+ boolean checkVersion = env.getFlinkVersion().checkVersion(false);
+ ApiAlertException.throwIfFalse(
+ checkVersion, "Unsupported flink version:" +
env.getFlinkVersion().version());
+
+ // 2) check env
+ boolean envOk = applicationService.checkEnv(app);
+ ApiAlertException.throwIfFalse(
+ envOk, "Check flink env failed, please check the flink version of this
job");
+
+ // 3) Whether the application can currently start a new building progress
+ if (!forceBuild && !allowToBuildNow(appId)) {
+ throw new ApiAlertException(
+ "The job is invalid, or the job cannot be built while it is
running");
+ }
+ }
+
/** create building pipeline instance */
private BuildPipeline createPipelineInstance(@Nonnull Application app) {
FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(app.getVersionId());