This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new aff8784f4 [Improve] openapi start|cancel flink app improvement
aff8784f4 is described below
commit aff8784f4fd789f8d9051db79ae9e31f2afe2306
Author: benjobs <[email protected]>
AuthorDate: Sun Aug 4 13:55:46 2024 +0800
[Improve] openapi start|cancel flink app improvement
---
.../console/core/aspect/StreamParkAspect.java | 45 +++++++++-
.../console/core/component/OpenAPIComponent.java | 15 ++--
.../console/core/controller/OpenAPIController.java | 39 +++++----
...intController.java => SavepointController.java} | 26 +++---
.../console/core/entity/Application.java | 6 +-
.../core/entity/{SavePoint.java => Savepoint.java} | 6 +-
.../{SavePointMapper.java => SavepointMapper.java} | 4 +-
...SavePointService.java => SavepointService.java} | 8 +-
.../core/service/impl/ApplicationServiceImpl.java | 82 +++++++++---------
...tServiceImpl.java => SavepointServiceImpl.java} | 98 +++++++++++-----------
.../console/core/task/CheckpointProcessor.java | 28 +++----
.../console/core/task/FlinkAppHttpWatcher.java | 24 +++---
.../system/controller/AccessTokenController.java | 41 ---------
.../{SavePointMapper.xml => SavepointMapper.xml} | 4 +-
.../core/service/ApplicationServiceTest.java | 4 +-
.../src/api/flink/app/app.ts | 2 +-
.../src/api/flink/app/app.type.ts | 8 +-
.../components/AppView/StartApplicationModal.vue | 18 ++--
.../components/AppView/StopApplicationModal.vue | 16 ++--
.../src/views/flink/app/hooks/useSavepoint.tsx | 2 +-
.../flink/client/bean/CancelResponse.scala | 2 +-
.../flink/client/bean/SavepointResponse.scala | 2 +-
22 files changed, 244 insertions(+), 236 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
index 0deacfecf..d681e9ea7 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
@@ -17,6 +17,8 @@
package org.apache.streampark.console.core.aspect;
+import org.apache.streampark.common.util.DateUtils;
+import org.apache.streampark.common.util.ReflectUtils;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.core.annotation.OpenAPI;
@@ -54,8 +56,11 @@ import
org.springframework.web.context.request.ServletRequestAttributes;
import javax.annotation.PostConstruct;
import javax.servlet.http.HttpServletRequest;
+import java.lang.reflect.Field;
+import java.util.Date;
import java.util.HashSet;
import java.util.Set;
+import java.util.TimeZone;
@Slf4j
@Component
@@ -89,26 +94,58 @@ public class StreamParkAspect {
"execution(public"
+ " org.apache.streampark.console.base.domain.RestResponse"
+ " org.apache.streampark.console.core.controller.*.*(..))")
- public void openAPI() {}
+ public void openAPIPointcut() {}
@SuppressWarnings("checkstyle:SimplifyBooleanExpression")
- @Around(value = "openAPI()")
+ @Around(value = "openAPIPointcut()")
public RestResponse openAPI(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature methodSignature = (MethodSignature)
joinPoint.getSignature();
log.debug("restResponse aspect, method:{}", methodSignature.getName());
Boolean isApi =
(Boolean)
SecurityUtils.getSubject().getSession().getAttribute(AccessToken.IS_API_TOKEN);
if (isApi != null && isApi) {
+ HttpServletRequest request =
+ ((ServletRequestAttributes)
RequestContextHolder.getRequestAttributes()).getRequest();
OpenAPI openAPI =
methodSignature.getMethod().getAnnotation(OpenAPI.class);
if (openAPI == null) {
- HttpServletRequest request =
- ((ServletRequestAttributes)
RequestContextHolder.getRequestAttributes()).getRequest();
String url = request.getRequestURI();
if (openapiWhitelist.contains(url)) {
log.info("request by openapi white-list: {} ", url);
} else {
throw new ApiAlertException("current api unsupported: " + url);
}
+ } else {
+ Object[] objects = joinPoint.getArgs();
+ for (OpenAPI.Param param : openAPI.param()) {
+ String bingFor = param.bindFor();
+ if (StringUtils.isNotBlank(bingFor)) {
+ String name = param.name();
+ for (Object args : objects) {
+ Field bindForField = ReflectUtils.getField(args.getClass(),
bingFor);
+ if (bindForField != null) {
+ Object value = request.getParameter(name);
+ bindForField.setAccessible(true);
+ if (value != null) {
+ if (param.type().equals(String.class)) {
+ bindForField.set(args, value.toString());
+ } else if (param.type().equals(Boolean.class)
+ || param.type().equals(boolean.class)) {
+ bindForField.set(args,
Boolean.parseBoolean(value.toString()));
+ } else if (param.type().equals(Integer.class) ||
param.type().equals(int.class)) {
+ bindForField.set(args, Integer.parseInt(value.toString()));
+ } else if (param.type().equals(Long.class) ||
param.type().equals(long.class)) {
+ bindForField.set(args, Long.parseLong(value.toString()));
+ } else if (param.type().equals(Date.class)) {
+ bindForField.set(
+ args,
+ DateUtils.parse(
+ value.toString(), DateUtils.fullFormat(),
TimeZone.getDefault()));
+ }
+ }
+ }
+ }
+ }
+ }
}
}
return (RestResponse) joinPoint.proceed();
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/OpenAPIComponent.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/OpenAPIComponent.java
index 038b69109..7d433de7d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/OpenAPIComponent.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/OpenAPIComponent.java
@@ -68,7 +68,7 @@ public class OpenAPIComponent {
return schemas.get(name);
}
- public String getOpenApiCUrl(String baseUrl, Long appId, Long teamId, String
name) {
+ public String getOpenApiCUrl(String name, String baseUrl, Long appId, Long
teamId) {
OpenAPISchema schema = this.getOpenAPISchema(name);
if (schema == null) {
throw new UnsupportedOperationException("Unsupported OpenAPI: " + name);
@@ -89,10 +89,15 @@ public class OpenAPIComponent {
.forEach(
c -> {
if (c.isRequired()) {
- if ("appId".equals(c.getBindFor())) {
- curlBuilder.addFormData(c.getName(), appId);
- } else if ("teamId".equals(c.getBindFor())) {
- curlBuilder.addFormData(c.getName(), teamId);
+ switch (c.getBindFor()) {
+ case "appId":
+ curlBuilder.addFormData(c.getName(), appId);
+ break;
+ case "teamId":
+ curlBuilder.addFormData(c.getName(), teamId);
+ break;
+ default:
+ break;
}
} else {
curlBuilder.addFormData(c.getName(), c.getDefaultValue());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/OpenAPIController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/OpenAPIController.java
index 6369782aa..235a1c019 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/OpenAPIController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/OpenAPIController.java
@@ -67,13 +67,20 @@ public class OpenAPIController {
required = true,
type = Long.class),
@OpenAPI.Param(
- name = "savePointed",
- description = "restored app from the savepoint or latest
checkpoint",
+ name = "argument",
+ description = "flink program run argument",
required = false,
type = String.class,
- defaultValue = "false"),
+ bindFor = "args"),
+ @OpenAPI.Param(
+ name = "restoreFromSavepoint",
+ description = "restored app from the savepoint or checkpoint",
+ required = false,
+ type = Boolean.class,
+ defaultValue = "false",
+ bindFor = "restoreOrTriggerSavepoint"),
@OpenAPI.Param(
- name = "savePoint",
+ name = "savepointPath",
description = "savepoint or checkpoint path",
required = false,
type = String.class),
@@ -81,15 +88,15 @@ public class OpenAPIController {
name = "allowNonRestored",
description = "ignore savepoint if cannot be restored",
required = false,
- type = boolean.class,
- defaultValue = "false")
+ type = Boolean.class,
+ defaultValue = "false"),
})
@PermissionScope(app = "#app.appId", team = "#app.teamId")
@PostMapping("app/start")
@RequiresPermissions("app:start")
public RestResponse flinkStart(Application app) throws Exception {
applicationService.start(app, false);
- return RestResponse.success();
+ return RestResponse.success(true);
}
@OpenAPI(
@@ -114,13 +121,14 @@ public class OpenAPIController {
required = true,
type = Long.class),
@OpenAPI.Param(
- name = "savePointed",
+ name = "triggerSavepoint",
description = "trigger savepoint before taking stopping",
required = false,
- type = boolean.class,
- defaultValue = "false"),
+ type = Boolean.class,
+ defaultValue = "false",
+ bindFor = "restoreOrTriggerSavepoint"),
@OpenAPI.Param(
- name = "savePoint",
+ name = "savepointPath",
description = "savepoint path",
required = false,
type = String.class),
@@ -128,7 +136,7 @@ public class OpenAPIController {
name = "drain",
description = "send max watermark before canceling",
required = false,
- type = boolean.class,
+ type = Boolean.class,
defaultValue = "false"),
})
@PermissionScope(app = "#app.appId", team = "#app.teamId")
@@ -141,11 +149,8 @@ public class OpenAPIController {
@PostMapping("curl")
public RestResponse copyOpenApiCurl(
- String baseUrl,
- Long appId,
- @NotNull(message = "{required}") Long teamId,
- @NotBlank(message = "{required}") String name) {
- String url = openAPIComponent.getOpenApiCUrl(baseUrl, appId, teamId, name);
+ @NotBlank String name, String baseUrl, @NotNull Long appId, @NotNull
Long teamId) {
+ String url = openAPIComponent.getOpenApiCUrl(name, baseUrl, appId, teamId);
return RestResponse.success(url);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavePointController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavepointController.java
similarity index 74%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavePointController.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavepointController.java
index dbb8b2357..9969ab0df 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavePointController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavepointController.java
@@ -22,9 +22,9 @@ import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.exception.InternalException;
import org.apache.streampark.console.core.annotation.PermissionScope;
import org.apache.streampark.console.core.entity.Application;
-import org.apache.streampark.console.core.entity.SavePoint;
+import org.apache.streampark.console.core.entity.Savepoint;
import org.apache.streampark.console.core.service.ApplicationService;
-import org.apache.streampark.console.core.service.SavePointService;
+import org.apache.streampark.console.core.service.SavepointService;
import org.apache.shiro.authz.annotation.RequiresPermissions;
@@ -42,34 +42,34 @@ import javax.annotation.Nullable;
@Validated
@RestController
@RequestMapping("flink/savepoint")
-public class SavePointController {
+public class SavepointController {
@Autowired private ApplicationService applicationService;
- @Autowired private SavePointService savePointService;
+ @Autowired private SavepointService savepointService;
@PostMapping("history")
@PermissionScope(app = "#sp.appId", team = "#sp.teamId")
- public RestResponse history(SavePoint sp, RestRequest request) {
- IPage<SavePoint> page = savePointService.page(sp, request);
+ public RestResponse history(Savepoint sp, RestRequest request) {
+ IPage<Savepoint> page = savepointService.page(sp, request);
return RestResponse.success(page);
}
@PostMapping("delete")
@RequiresPermissions("savepoint:delete")
@PermissionScope(app = "#sp.appId", team = "#sp.teamId")
- public RestResponse delete(SavePoint sp) throws InternalException {
- SavePoint savePoint = savePointService.getById(sp.getId());
- Application application = applicationService.getById(savePoint.getAppId());
- Boolean deleted = savePointService.delete(sp.getId(), application);
+ public RestResponse delete(Savepoint sp) throws InternalException {
+ Savepoint savepoint = savepointService.getById(sp.getId());
+ Application application = applicationService.getById(savepoint.getAppId());
+ Boolean deleted = savepointService.delete(sp.getId(), application);
return RestResponse.success(deleted);
}
@PostMapping("trigger")
@RequiresPermissions("savepoint:trigger")
- @PermissionScope(app = "#savePoint.appId", team = "#savePoint.teamId")
- public RestResponse trigger(SavePoint savePoint, @Nullable String
savepointPath) {
- savePointService.trigger(savePoint.getAppId(), savepointPath);
+ @PermissionScope(app = "#savepoint.appId", team = "#savepoint.teamId")
+ public RestResponse trigger(Savepoint savepoint, @Nullable String
savepointPath) {
+ savepointService.trigger(savepoint.getAppId(), savepointPath);
return RestResponse.success(true);
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index f43058d9e..61ec14784 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -241,10 +241,10 @@ public class Application implements Serializable {
private transient String flinkVersion;
private transient String confPath;
private transient Integer format;
- private transient String savePoint;
- private transient Boolean savePointed = false;
+ private transient String savepointPath;
+ private transient Boolean restoreOrTriggerSavepoint = false;
private transient Boolean drain = false;
- private transient Long savePointTimeout = 60L;
+ private transient Long savepointTimeout = 60L;
private transient Boolean allowNonRestored = false;
private transient String socketId;
private transient String projectName;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SavePoint.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Savepoint.java
similarity index 95%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SavePoint.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Savepoint.java
index dab85f43d..297114427 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SavePoint.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Savepoint.java
@@ -30,7 +30,7 @@ import java.util.Date;
@Setter
@TableName("t_flink_savepoint")
@Slf4j
-public class SavePoint {
+public class Savepoint {
@TableId(type = IdType.AUTO)
private Long id;
@@ -42,8 +42,8 @@ public class SavePoint {
private Boolean latest;
/**
- * 1) checkPoint <br>
- * 2) savePoint
+ * 1) checkpoint <br>
+ * 2) savepoint
*/
private Integer type;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavePointMapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavepointMapper.java
similarity index 88%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavePointMapper.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavepointMapper.java
index 27a5595c9..8332b0ae7 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavePointMapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavepointMapper.java
@@ -17,8 +17,8 @@
package org.apache.streampark.console.core.mapper;
-import org.apache.streampark.console.core.entity.SavePoint;
+import org.apache.streampark.console.core.entity.Savepoint;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-public interface SavePointMapper extends BaseMapper<SavePoint> {}
+public interface SavepointMapper extends BaseMapper<Savepoint> {}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
similarity index 87%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
index c6b5479f5..60cedc978 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
@@ -20,24 +20,24 @@ package org.apache.streampark.console.core.service;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.InternalException;
import org.apache.streampark.console.core.entity.Application;
-import org.apache.streampark.console.core.entity.SavePoint;
+import org.apache.streampark.console.core.entity.Savepoint;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService;
import javax.annotation.Nullable;
-public interface SavePointService extends IService<SavePoint> {
+public interface SavepointService extends IService<Savepoint> {
void expire(Long appId);
- SavePoint getLatest(Long id);
+ Savepoint getLatest(Long id);
void trigger(Long appId, @Nullable String savepointPath);
Boolean delete(Long id, Application application) throws InternalException;
- IPage<SavePoint> page(SavePoint savePoint, RestRequest request);
+ IPage<Savepoint> page(Savepoint savepoint, RestRequest request);
void removeApp(Application application);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 177cb85a0..45dd84156 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -52,7 +52,7 @@ import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.entity.FlinkSql;
import org.apache.streampark.console.core.entity.Project;
-import org.apache.streampark.console.core.entity.SavePoint;
+import org.apache.streampark.console.core.entity.Savepoint;
import org.apache.streampark.console.core.enums.AppExistsState;
import org.apache.streampark.console.core.enums.CandidateType;
import org.apache.streampark.console.core.enums.ChangedType;
@@ -75,7 +75,7 @@ import
org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.FlinkSqlService;
import org.apache.streampark.console.core.service.ProjectService;
-import org.apache.streampark.console.core.service.SavePointService;
+import org.apache.streampark.console.core.service.SavepointService;
import org.apache.streampark.console.core.service.ServiceHelper;
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.VariableService;
@@ -195,7 +195,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
@Autowired private FlinkSqlService flinkSqlService;
- @Autowired private SavePointService savePointService;
+ @Autowired private SavepointService savepointService;
@Autowired private EffectiveService effectiveService;
@@ -412,7 +412,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
backUpService.removeApp(application);
// 6) remove savepoint
- savePointService.removeApp(application);
+ savepointService.removeApp(application);
// 7) remove BuildPipeline
appBuildPipeService.removeApp(application.getId());
@@ -1310,7 +1310,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
applicationLog.setYarnAppId(application.getClusterId());
}
- if (appParam.getSavePointed()) {
+ if (appParam.getRestoreOrTriggerSavepoint()) {
if (!application.isKubernetesModeJob()) {
FlinkAppHttpWatcher.addSavepoint(application.getId());
application.setOptionState(OptionState.SAVEPOINTING.getValue());
@@ -1331,10 +1331,10 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
// infer savepoint
String customSavepoint = null;
- if (appParam.getSavePointed()) {
- customSavepoint = appParam.getSavePoint();
+ if (appParam.getRestoreOrTriggerSavepoint()) {
+ customSavepoint = appParam.getSavepointPath();
if (StringUtils.isBlank(customSavepoint)) {
- customSavepoint = savePointService.getSavePointPath(appParam);
+ customSavepoint = savepointService.getSavePointPath(appParam);
}
}
@@ -1364,8 +1364,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
properties,
clusterId,
application.getJobId(),
- appParam.getSavePointed(),
- appParam.getDrain() == null ? false : appParam.getDrain(),
+ appParam.getRestoreOrTriggerSavepoint(),
+ appParam.getDrain() != null && appParam.getDrain(),
customSavepoint,
namespace);
@@ -1396,8 +1396,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
application.setState(FlinkAppState.FAILED.getValue());
updateById(application);
- if (appParam.getSavePointed()) {
- savePointService.expire(application.getId());
+ if (appParam.getRestoreOrTriggerSavepoint()) {
+ savepointService.expire(application.getId());
}
// re-tracking flink job on kubernetes and logging exception
if (application.isKubernetesModeJob()) {
@@ -1413,17 +1413,17 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
// save log...
applicationLogService.save(applicationLog);
- if (cancelResponse != null && cancelResponse.savePointDir() != null)
{
- String savePointDir = cancelResponse.savePointDir();
- log.info("savePoint path: {}", savePointDir);
- SavePoint savePoint = new SavePoint();
- savePoint.setPath(savePointDir);
- savePoint.setAppId(application.getId());
- savePoint.setLatest(true);
- savePoint.setType(CheckPointType.SAVEPOINT.get());
- savePoint.setCreateTime(new Date());
- savePoint.setTriggerTime(triggerTime);
- savePointService.save(savePoint);
+ if (cancelResponse != null && cancelResponse.savepointDir() != null)
{
+ String savepointDir = cancelResponse.savepointDir();
+ log.info("savepoint path: {}", savepointDir);
+ Savepoint savepoint = new Savepoint();
+ savepoint.setPath(savepointDir);
+ savepoint.setAppId(application.getId());
+ savepoint.setLatest(true);
+ savepoint.setType(CheckPointType.SAVEPOINT.get());
+ savepoint.setCreateTime(new Date());
+ savepoint.setTriggerTime(triggerTime);
+ savepointService.save(savepoint);
}
if (application.isKubernetesModeJob()) {
@@ -1434,9 +1434,9 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
@Override
public String checkSavepointPath(Application appParam) throws Exception {
- String savepointPath = appParam.getSavePoint();
+ String savepointPath = appParam.getSavepointPath();
if (StringUtils.isBlank(savepointPath)) {
- savepointPath = savePointService.getSavePointPath(appParam);
+ savepointPath = savepointService.getSavePointPath(appParam);
}
if (StringUtils.isNotBlank(savepointPath)) {
@@ -1514,14 +1514,14 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
if (!application.isNeedRestartOnFailed()) {
return;
}
- appParam.setSavePointed(true);
+ appParam.setRestoreOrTriggerSavepoint(true);
application.setRestartCount(application.getRestartCount() + 1);
}
starting(application);
application.setAllowNonRestored(
- appParam.getAllowNonRestored() == null ? false :
appParam.getAllowNonRestored());
+ appParam.getAllowNonRestored() != null &&
appParam.getAllowNonRestored());
String appConf;
String flinkUserJar = null;
@@ -1612,7 +1612,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
AppBuildPipeline buildPipeline =
appBuildPipeService.getById(application.getId());
-
Utils.notNull(buildPipeline);
BuildResult buildResult = buildPipeline.getBuildResult();
@@ -1621,8 +1620,9 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
// Get the args after placeholder replacement
- String applicationArgs =
- variableService.replaceVariable(application.getTeamId(),
application.getArgs());
+ String args =
+ StringUtils.isBlank(appParam.getArgs()) ? application.getArgs() :
appParam.getArgs();
+ String applicationArgs =
variableService.replaceVariable(application.getTeamId(), args);
String k8sNamespace;
String k8sClusterId;
@@ -1653,7 +1653,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
application.getJobName(),
appConf,
application.getApplicationType(),
- getSavePointed(appParam),
+ getSavepointPath(appParam),
applicationArgs,
buildResult,
extraParameter,
@@ -1699,7 +1699,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
applicationLog.setSuccess(true);
// issue: https://github.com/apache/incubator-streampark/issues/3749
- if (appParam.getSavePointed() == null || !appParam.getSavePointed())
{
+ if (appParam.getRestoreOrTriggerSavepoint() == null
+ || !appParam.getRestoreOrTriggerSavepoint()) {
checkpointProcessor.resetCheckpointNum(appParam.getId());
}
@@ -1842,7 +1843,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
application.setState(FlinkAppState.CANCELED.getValue());
application.setOptionTime(new Date());
updateById(application);
- savePointService.expire(application.getId());
+ savepointService.expire(application.getId());
// re-tracking flink job on kubernetes and logging exception
if (application.isKubernetesModeJob()) {
TrackId id = k8sWatcherWrapper.toTrackId(application);
@@ -1872,15 +1873,16 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
return false;
}
- private String getSavePointed(Application appParam) {
- if (appParam.getSavePointed() != null && appParam.getSavePointed()) {
- if (StringUtils.isBlank(appParam.getSavePoint())) {
- SavePoint savePoint = savePointService.getLatest(appParam.getId());
- if (savePoint != null) {
- return savePoint.getPath();
+ private String getSavepointPath(Application appParam) {
+ if (appParam.getRestoreOrTriggerSavepoint() != null
+ && appParam.getRestoreOrTriggerSavepoint()) {
+ if (StringUtils.isBlank(appParam.getSavepointPath())) {
+ Savepoint savepoint = savepointService.getLatest(appParam.getId());
+ if (savepoint != null) {
+ return savepoint.getPath();
}
} else {
- return appParam.getSavePoint();
+ return appParam.getSavepointPath();
}
}
return null;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
similarity index 86%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
index a8a8cbb7a..fec264c8a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
@@ -32,17 +32,17 @@ import
org.apache.streampark.console.core.entity.ApplicationConfig;
import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.FlinkEnv;
-import org.apache.streampark.console.core.entity.SavePoint;
+import org.apache.streampark.console.core.entity.Savepoint;
import org.apache.streampark.console.core.enums.CheckPointType;
import org.apache.streampark.console.core.enums.Operation;
import org.apache.streampark.console.core.enums.OptionState;
-import org.apache.streampark.console.core.mapper.SavePointMapper;
+import org.apache.streampark.console.core.mapper.SavepointMapper;
import org.apache.streampark.console.core.service.ApplicationConfigService;
import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
-import org.apache.streampark.console.core.service.SavePointService;
+import org.apache.streampark.console.core.service.SavepointService;
import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.SavepointResponse;
@@ -80,8 +80,8 @@ import java.util.concurrent.TimeoutException;
@Slf4j
@Service
@Transactional(propagation = Propagation.SUPPORTS, readOnly = true,
rollbackFor = Exception.class)
-public class SavePointServiceImpl extends ServiceImpl<SavePointMapper,
SavePoint>
- implements SavePointService {
+public class SavepointServiceImpl extends ServiceImpl<SavepointMapper,
Savepoint>
+ implements SavepointService {
@Autowired private FlinkEnvService flinkEnvService;
@@ -108,21 +108,21 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
@Override
public void expire(Long appId) {
- SavePoint savePoint = new SavePoint();
- savePoint.setLatest(false);
- LambdaQueryWrapper<SavePoint> queryWrapper =
- new LambdaQueryWrapper<SavePoint>().eq(SavePoint::getAppId, appId);
- this.update(savePoint, queryWrapper);
+ Savepoint savepoint = new Savepoint();
+ savepoint.setLatest(false);
+ LambdaQueryWrapper<Savepoint> queryWrapper =
+ new LambdaQueryWrapper<Savepoint>().eq(Savepoint::getAppId, appId);
+ this.update(savepoint, queryWrapper);
}
@Override
- public boolean save(SavePoint entity) {
+ public boolean save(Savepoint entity) {
this.expire(entity);
this.expire(entity.getAppId());
return super.save(entity);
}
- private void expire(SavePoint entity) {
+ private void expire(Savepoint entity) {
FlinkEnv flinkEnv = flinkEnvService.getByAppId(entity.getAppId());
Application application = applicationService.getById(entity.getAppId());
Utils.notNull(flinkEnv);
@@ -186,40 +186,40 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
}
if (cpThreshold == 0) {
- LambdaQueryWrapper<SavePoint> queryWrapper =
- new LambdaQueryWrapper<SavePoint>()
- .eq(SavePoint::getAppId, entity.getAppId())
- .eq(SavePoint::getType, 1);
+ LambdaQueryWrapper<Savepoint> queryWrapper =
+ new LambdaQueryWrapper<Savepoint>()
+ .eq(Savepoint::getAppId, entity.getAppId())
+ .eq(Savepoint::getType, 1);
this.remove(queryWrapper);
} else {
- LambdaQueryWrapper<SavePoint> queryWrapper =
- new LambdaQueryWrapper<SavePoint>()
- .select(SavePoint::getTriggerTime)
- .eq(SavePoint::getAppId, entity.getAppId())
- .eq(SavePoint::getType, CheckPointType.CHECKPOINT.get())
- .orderByDesc(SavePoint::getTriggerTime);
-
- Page<SavePoint> savePointPage =
+ LambdaQueryWrapper<Savepoint> queryWrapper =
+ new LambdaQueryWrapper<Savepoint>()
+ .select(Savepoint::getTriggerTime)
+ .eq(Savepoint::getAppId, entity.getAppId())
+ .eq(Savepoint::getType, CheckPointType.CHECKPOINT.get())
+ .orderByDesc(Savepoint::getTriggerTime);
+
+ Page<Savepoint> savepointPage =
this.baseMapper.selectPage(new Page<>(1, cpThreshold + 1),
queryWrapper);
- if (!savePointPage.getRecords().isEmpty()
- && savePointPage.getRecords().size() > cpThreshold) {
- SavePoint savePoint = savePointPage.getRecords().get(cpThreshold - 1);
- LambdaQueryWrapper<SavePoint> lambdaQueryWrapper =
- new LambdaQueryWrapper<SavePoint>()
- .eq(SavePoint::getAppId, entity.getAppId())
- .eq(SavePoint::getType, 1)
- .lt(SavePoint::getTriggerTime, savePoint.getTriggerTime());
+ if (!savepointPage.getRecords().isEmpty()
+ && savepointPage.getRecords().size() > cpThreshold) {
+ Savepoint savepoint = savepointPage.getRecords().get(cpThreshold - 1);
+ LambdaQueryWrapper<Savepoint> lambdaQueryWrapper =
+ new LambdaQueryWrapper<Savepoint>()
+ .eq(Savepoint::getAppId, entity.getAppId())
+ .eq(Savepoint::getType, 1)
+ .lt(Savepoint::getTriggerTime, savepoint.getTriggerTime());
this.remove(lambdaQueryWrapper);
}
}
}
@Override
- public SavePoint getLatest(Long id) {
- LambdaQueryWrapper<SavePoint> queryWrapper =
- new LambdaQueryWrapper<SavePoint>()
- .eq(SavePoint::getAppId, id)
- .eq(SavePoint::getLatest, true);
+ public Savepoint getLatest(Long id) {
+ LambdaQueryWrapper<Savepoint> queryWrapper =
+ new LambdaQueryWrapper<Savepoint>()
+ .eq(Savepoint::getAppId, id)
+ .eq(Savepoint::getLatest, true);
return this.getOne(queryWrapper);
}
@@ -336,10 +336,10 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
10L,
TimeUnit.MINUTES,
savepointResponse -> {
- if (savepointResponse != null &&
savepointResponse.savePointDir() != null) {
+ if (savepointResponse != null &&
savepointResponse.savepointDir() != null) {
applicationLog.setSuccess(true);
- String savePointDir = savepointResponse.savePointDir();
- log.info("Request savepoint successful, savepointDir: {}",
savePointDir);
+ String savepointDir = savepointResponse.savepointDir();
+ log.info("Request savepoint successful, savepointDir: {}",
savepointDir);
}
},
e -> {
@@ -422,10 +422,10 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
@Override
@Transactional(rollbackFor = Exception.class)
public Boolean delete(Long id, Application application) throws
InternalException {
- SavePoint savePoint = getById(id);
+ Savepoint savepoint = getById(id);
try {
- if (CommonUtils.notEmpty(savePoint.getPath())) {
- application.getFsOperator().delete(savePoint.getPath());
+ if (CommonUtils.notEmpty(savepoint.getPath())) {
+ application.getFsOperator().delete(savepoint.getPath());
}
removeById(id);
return true;
@@ -436,11 +436,11 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
}
@Override
- public IPage<SavePoint> page(SavePoint savePoint, RestRequest request) {
+ public IPage<Savepoint> page(Savepoint savepoint, RestRequest request) {
request.setSortField("trigger_time");
- Page<SavePoint> page = MybatisPager.getPage(request);
- LambdaQueryWrapper<SavePoint> queryWrapper =
- new LambdaQueryWrapper<SavePoint>().eq(SavePoint::getAppId,
savePoint.getAppId());
+ Page<Savepoint> page = MybatisPager.getPage(request);
+ LambdaQueryWrapper<Savepoint> queryWrapper =
+ new LambdaQueryWrapper<Savepoint>().eq(Savepoint::getAppId,
savepoint.getAppId());
return this.page(page, queryWrapper);
}
@@ -448,8 +448,8 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
public void removeApp(Application application) {
Long appId = application.getId();
- LambdaQueryWrapper<SavePoint> queryWrapper =
- new LambdaQueryWrapper<SavePoint>().eq(SavePoint::getAppId, appId);
+ LambdaQueryWrapper<Savepoint> queryWrapper =
+ new LambdaQueryWrapper<Savepoint>().eq(Savepoint::getAppId, appId);
this.remove(queryWrapper);
try {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
index e08d37102..2f6f5bb90 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
@@ -18,12 +18,12 @@
package org.apache.streampark.console.core.task;
import org.apache.streampark.console.core.entity.Application;
-import org.apache.streampark.console.core.entity.SavePoint;
+import org.apache.streampark.console.core.entity.Savepoint;
import org.apache.streampark.console.core.enums.CheckPointStatus;
import org.apache.streampark.console.core.enums.FailoverStrategy;
import org.apache.streampark.console.core.metrics.flink.CheckPoints;
import org.apache.streampark.console.core.service.ApplicationService;
-import org.apache.streampark.console.core.service.SavePointService;
+import org.apache.streampark.console.core.service.SavepointService;
import org.apache.streampark.console.core.service.alert.AlertService;
import com.github.benmanes.caffeine.cache.Cache;
@@ -68,7 +68,7 @@ public class CheckpointProcessor {
@Autowired private AlertService alertService;
- @Autowired private SavePointService savePointService;
+ @Autowired private SavepointService savepointService;
@Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
@@ -164,8 +164,8 @@ public class CheckpointProcessor {
return checkPointCache.get(
cacheId,
key -> {
- SavePoint savePoint = savePointService.getLatest(appId);
- return
Optional.ofNullable(savePoint).map(SavePoint::getChkId).orElse(null);
+ Savepoint savepoint = savepointService.getLatest(appId);
+ return
Optional.ofNullable(savepoint).map(Savepoint::getChkId).orElse(null);
});
}
@@ -177,15 +177,15 @@ public class CheckpointProcessor {
}
private void saveSavepoint(CheckPoints.CheckPoint checkPoint, Long appId) {
- SavePoint savePoint = new SavePoint();
- savePoint.setAppId(appId);
- savePoint.setChkId(checkPoint.getId());
- savePoint.setLatest(true);
- savePoint.setType(checkPoint.getCheckPointType().get());
- savePoint.setPath(checkPoint.getExternalPath());
- savePoint.setTriggerTime(new Date(checkPoint.getTriggerTimestamp()));
- savePoint.setCreateTime(new Date());
- savePointService.save(savePoint);
+ Savepoint savepoint = new Savepoint();
+ savepoint.setAppId(appId);
+ savepoint.setChkId(checkPoint.getId());
+ savepoint.setLatest(true);
+ savepoint.setType(checkPoint.getCheckPointType().get());
+ savepoint.setPath(checkPoint.getExternalPath());
+ savepoint.setTriggerTime(new Date(checkPoint.getTriggerTimestamp()));
+ savepoint.setCreateTime(new Date());
+ savepointService.save(savepoint);
}
public static class Counter {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index 1fbef57e9..82939b8c1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -35,7 +35,7 @@ import
org.apache.streampark.console.core.metrics.flink.Overview;
import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.FlinkClusterService;
-import org.apache.streampark.console.core.service.SavePointService;
+import org.apache.streampark.console.core.service.SavepointService;
import org.apache.streampark.console.core.service.alert.AlertService;
import org.apache.commons.lang3.StringUtils;
@@ -82,7 +82,7 @@ public class FlinkAppHttpWatcher {
@Autowired private FlinkClusterService flinkClusterService;
- @Autowired private SavePointService savePointService;
+ @Autowired private SavepointService savepointService;
// track interval every 5 seconds
private static final long WATCHING_INTERVAL = 1000L * 5;
@@ -244,13 +244,13 @@ public class FlinkAppHttpWatcher {
// non-mapping
if (application.getState() != FlinkAppState.MAPPING.getValue()) {
log.error(
- "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi and
getFromYarnRestApi error,job failed,savePoint expired!");
+ "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi and
getFromYarnRestApi error,job failed,savepoint expired!");
if (StopFrom.NONE.equals(stopFrom)) {
Date lostTime = LOST_CACHE.getIfPresent(application.getId());
if (lostTime == null) {
LOST_CACHE.put(application.getId(), new Date());
} else if (DateUtils.toSecondDuration(lostTime, new Date()) >= 30) {
- savePointService.expire(application.getId());
+ savepointService.expire(application.getId());
application.setState(FlinkAppState.LOST.getValue());
WATCHING_APPS.remove(application.getId());
LOST_CACHE.invalidate(application.getId());
@@ -324,7 +324,7 @@ public class FlinkAppHttpWatcher {
} catch (Exception e) {
log.error("get flink jobOverview error: {}", e.getMessage(), e);
}
- // 3) savePoint obsolete check and NEED_START check
+ // 3) savepoint obsolete check and NEED_START check
OptionState optionState = OPTIONING.get(application.getId());
if (currentState.equals(FlinkAppState.RUNNING)) {
handleRunningState(application, optionState, currentState);
@@ -422,7 +422,7 @@ public class FlinkAppHttpWatcher {
}
}
- // The current state is running, and there is a current task in the
savePointCache,
+ // The current state is running, and there is a current task in the
savepointCache,
// indicating that the task is doing savepoint
if (SAVEPOINT_CACHE.getIfPresent(appId) != null) {
application.setOptionState(OptionState.SAVEPOINTING.getValue());
@@ -489,8 +489,8 @@ public class FlinkAppHttpWatcher {
if (StopFrom.NONE.equals(stopFrom) ||
applicationService.checkAlter(application)) {
if (StopFrom.NONE.equals(stopFrom)) {
log.info(
- "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi, job
cancel is not form StreamPark,savePoint expired!");
- savePointService.expire(application.getId());
+ "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi, job
cancel is not form StreamPark,savepoint expired!");
+ savepointService.expire(application.getId());
}
stopCanceledJob(application.getId());
alertService.alert(application, FlinkAppState.CANCELED);
@@ -549,8 +549,8 @@ public class FlinkAppHttpWatcher {
} finally {
if (StopFrom.NONE.equals(stopFrom)) {
log.error(
- "[StreamPark][FlinkAppHttpWatcher] query previous state was
canceling and stopFrom NotFound,savePoint expired!");
- savePointService.expire(application.getId());
+ "[StreamPark][FlinkAppHttpWatcher] query previous state was
canceling and stopFrom NotFound,savepoint expired!");
+ savepointService.expire(application.getId());
if (flinkAppState == FlinkAppState.KILLED || flinkAppState ==
FlinkAppState.FAILED) {
alertService.alert(application, flinkAppState);
}
@@ -579,8 +579,8 @@ public class FlinkAppHttpWatcher {
if (FlinkAppState.KILLED.equals(flinkAppState)) {
if (StopFrom.NONE.equals(stopFrom)) {
log.error(
- "[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi,job
was killed and stopFrom NotFound,savePoint expired!");
- savePointService.expire(application.getId());
+ "[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi,job
was killed and stopFrom NotFound,savepoint expired!");
+ savepointService.expire(application.getId());
}
flinkAppState = FlinkAppState.CANCELED;
cleanSavepoint(application);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/AccessTokenController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/AccessTokenController.java
index 1d5226d1c..a8dfb22e0 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/AccessTokenController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/AccessTokenController.java
@@ -17,10 +17,8 @@
package org.apache.streampark.console.system.controller;
-import org.apache.streampark.common.util.CURLBuilder;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
-import org.apache.streampark.console.core.annotation.PermissionScope;
import org.apache.streampark.console.core.enums.AccessTokenState;
import org.apache.streampark.console.core.service.ServiceHelper;
import org.apache.streampark.console.system.entity.AccessToken;
@@ -95,43 +93,4 @@ public class AccessTokenController {
boolean res = accessTokenService.delete(tokenId);
return RestResponse.success(res);
}
-
- /**
- * copy cURL, hardcode now, there is no need for configuration here, because
there are several
- * fixed interfaces
- */
- @PermissionScope(app = "#appId", team = "#teamId")
- @PostMapping(value = "curl")
- public RestResponse copyRestApiCurl(
- @NotBlank(message = "{required}") String appId,
- @NotBlank(message = "{required}") String teamId,
- @NotBlank(message = "{required}") String baseUrl,
- @NotBlank(message = "{required}") String path) {
- String resultCURL = null;
- CURLBuilder curlBuilder = new CURLBuilder(baseUrl + path);
-
- curlBuilder
- .addHeader("Content-Type", "application/x-www-form-urlencoded;
charset=UTF-8")
- .addHeader(
- "Authorization",
accessTokenService.getByUserId(serviceHelper.getUserId()).getToken());
-
- if ("/flink/app/start".equalsIgnoreCase(path)) {
- resultCURL =
- curlBuilder
- .addFormData("id", appId)
- .addFormData("teamId", teamId)
- .addFormData("allowNonRestored", "false")
- .addFormData("savePointed", "false")
- .build();
- } else if ("/flink/app/cancel".equalsIgnoreCase(path)) {
- resultCURL =
- curlBuilder
- .addFormData("id", appId)
- .addFormData("teamId", teamId)
- .addFormData("savePointed", "false")
- .addFormData("drain", "false")
- .build();
- }
- return RestResponse.success(resultCURL);
- }
}
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavePointMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavepointMapper.xml
similarity index 97%
rename from
streampark-console/streampark-console-service/src/main/resources/mapper/core/SavePointMapper.xml
rename to
streampark-console/streampark-console-service/src/main/resources/mapper/core/SavepointMapper.xml
index cc1b72e57..99ff6b427 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavePointMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavepointMapper.xml
@@ -16,8 +16,8 @@
~ limitations under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper namespace="org.apache.streampark.console.core.mapper.SavePointMapper">
- <resultMap id="BaseResultMap"
type="org.apache.streampark.console.core.entity.SavePoint">
+<mapper namespace="org.apache.streampark.console.core.mapper.SavepointMapper">
+ <resultMap id="BaseResultMap"
type="org.apache.streampark.console.core.entity.Savepoint">
<id column="id" jdbcType="BIGINT" property="id"/>
<result column="app_id" jdbcType="VARCHAR" property="appId"/>
<result column="latest" jdbcType="BOOLEAN" property="latest"/>
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
index 22274b2bc..3bd852366 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
@@ -86,7 +86,7 @@ class ApplicationServiceTest extends SpringTestBase {
app.setK8sHadoopIntegration(false);
app.setBackUp(false);
app.setRestart(false);
- app.setSavePointed(false);
+ app.setRestoreOrTriggerSavepoint(false);
app.setDrain(false);
app.setAllowNonRestored(false);
@@ -99,7 +99,7 @@ class ApplicationServiceTest extends SpringTestBase {
Application application = new Application();
application.setId(1304056220683497473L);
application.setRestart(false);
- application.setSavePointed(false);
+ application.setRestoreOrTriggerSavepoint(false);
application.setAllowNonRestored(false);
applicationService.start(application, false);
diff --git
a/streampark-console/streampark-console-webapp/src/api/flink/app/app.ts
b/streampark-console/streampark-console-webapp/src/api/flink/app/app.ts
index f84bece2d..ccb0d868f 100644
--- a/streampark-console/streampark-console-webapp/src/api/flink/app/app.ts
+++ b/streampark-console/streampark-console-webapp/src/api/flink/app/app.ts
@@ -212,7 +212,7 @@ export function fetchK8sStartLog(data):
Promise<AxiosResponse<any>> {
*/
export function fetchCheckSavepointPath(data: {
id?: string;
- savePoint?: string;
+ savepointPath?: string;
}): Promise<AxiosResponse<Result>> {
return defHttp.post(
{ url: APP_API.CHECK_SAVEPOINT_PATH, data },
diff --git
a/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
b/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
index e79a1cedc..7eb430951 100644
--- a/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
+++ b/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
@@ -116,8 +116,8 @@ export interface AppListRecord {
flinkVersion: string;
confPath?: any;
format?: any;
- savePoint?: any;
- savePointed: boolean;
+ savepointPath?: any;
+ restoreOrTriggerSavepoint: boolean;
drain: boolean;
allowNonRestored: boolean;
socketId?: any;
@@ -147,9 +147,9 @@ interface AppControl {
/* cancel params */
export interface CancelParam {
id: string;
- savePointed: boolean;
+ restoreOrTriggerSavepoint: boolean;
drain: boolean;
- savePoint: string;
+ savepointPath: string;
}
// create Params
export interface CreateParams {
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
index 1e9e30a6a..48ab4a2be 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
@@ -50,7 +50,7 @@
Object.assign(receiveData, data);
resetFields();
setFieldsValue({
- startSavePoint: receiveData.selected?.path,
+ savepointPath: receiveData.selected?.path,
});
}
});
@@ -67,7 +67,7 @@
labelWidth: 120,
schemas: [
{
- field: 'startSavePointed',
+ field: 'restoreSavepoint',
label: t('flink.app.view.fromSavepoint'),
component: 'Switch',
componentProps: {
@@ -78,7 +78,7 @@
afterItem: () => h('span', { class: 'conf-switch' },
t('flink.app.view.savepointTip')),
},
{
- field: 'startSavePoint',
+ field: 'savepointPath',
label: 'Savepoint',
component:
receiveData.historySavePoint && receiveData.historySavePoint.length
> 0
@@ -87,7 +87,7 @@
afterItem: () =>
h('span', { class: 'conf-switch' },
handleSavePointTip(receiveData.historySavePoint)),
slot: 'savepoint',
- ifShow: ({ values }) => values.startSavePointed,
+ ifShow: ({ values }) => values.restoreSavepoint,
required: true,
},
{
@@ -100,7 +100,7 @@
},
afterItem: () => h('span', { class: 'conf-switch' },
t('flink.app.view.ignoreRestoredTip')),
defaultValue: false,
- ifShow: ({ values }) => values.startSavePointed,
+ ifShow: ({ values }) => values.restoreSavepoint,
},
],
colon: true,
@@ -132,13 +132,13 @@
async function handleDoSubmit() {
try {
const formValue = (await validate()) as Recordable;
- const savePointed = formValue.startSavePointed;
- const savePointPath = savePointed ? formValue['startSavePoint'] : null;
+ const restoreOrTriggerSavepoint = formValue.restoreSavepoint;
+ const savepointPath = restoreOrTriggerSavepoint ?
formValue['savepointPath'] : null;
handleReset();
const { data } = await fetchStart({
id: receiveData.application.id,
- savePointed,
- savePoint: savePointPath,
+ restoreOrTriggerSavepoint,
+ savepointPath: savepointPath,
allowNonRestored: formValue.allowNonRestoredState || false,
});
if (data.data) {
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
index 06d7f73b3..31db50d12 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
@@ -46,7 +46,7 @@
labelWidth: 120,
schemas: [
{
- field: 'stopSavePointed',
+ field: 'triggerSavepoint',
label: t('flink.app.operation.triggerSavePoint'),
component: 'Switch',
componentProps: {
@@ -65,7 +65,7 @@
placeholder: t('flink.app.operation.customSavepoint'),
allowClear: true,
},
- ifShow: ({ values }) => !!values.stopSavePointed,
+ ifShow: ({ values }) => !!values.triggerSavepoint,
},
{
field: 'drain',
@@ -76,7 +76,7 @@
unCheckedChildren: 'OFF',
},
defaultValue: false,
- ifShow: ({ values }) => !!values.stopSavePointed,
+ ifShow: ({ values }) => !!values.triggerSavepoint,
afterItem: () => h('span', { class: 'conf-switch' },
t('flink.app.operation.enableDrain')),
},
],
@@ -90,18 +90,18 @@
/* submit */
async function handleSubmit() {
try {
- const { stopSavePointed, customSavepoint, drain } = (await validate())
as Recordable;
+ const { triggerSavepoint, customSavepoint, drain } = (await validate())
as Recordable;
const stopReq = {
id: app.id,
- savePointed: stopSavePointed,
- savePoint: customSavepoint,
+ restoreOrTriggerSavepoint: triggerSavepoint,
+ savepointPath: customSavepoint,
drain: drain,
};
- if (stopSavePointed) {
+ if (triggerSavepoint) {
if (customSavepoint) {
const { data } = await fetchCheckSavepointPath({
- savePoint: customSavepoint,
+ savepointPath: customSavepoint,
});
if (data.data === false) {
await createErrorSwal(t('flink.app.operation.invalidSavePoint') +
data.message);
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
index f84e1ff62..d0962e26d 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
@@ -87,7 +87,7 @@ export const useSavepoint = (updateOption: Fn) => {
if (unref(customSavepoint)) {
submitLoading.value = true;
const { data } = await fetchCheckSavepointPath({
- savePoint: unref(customSavepoint),
+ savepointPath: unref(customSavepoint),
});
if (data.data === false) {
await createErrorSwal('custom savepoint path is invalid, ' +
data.message);
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelResponse.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelResponse.scala
index 20f11bd46..668a3af89 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelResponse.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelResponse.scala
@@ -17,4 +17,4 @@
package org.apache.streampark.flink.client.bean
-case class CancelResponse(savePointDir: String)
+case class CancelResponse(savepointDir: String)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointResponse.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointResponse.scala
index ca1b4b29a..6127f11e2 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointResponse.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointResponse.scala
@@ -18,4 +18,4 @@
package org.apache.streampark.flink.client.bean
/** Result class of trigger savepoint presents savepoint path. */
-case class SavepointResponse(savePointDir: String)
+case class SavepointResponse(savepointDir: String)