This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 77068229e [Improve] Shiro session cannot expire bug fixed (#3939)
77068229e is described below
commit 77068229ef312eb0c24befe6b7b148ddcb8f138a
Author: benjobs <[email protected]>
AuthorDate: Sun Aug 4 18:33:44 2024 +0800
[Improve] Shiro session cannot expire bug fixed (#3939)
* [Improve] Shiro session cannot expire bug fixed
* [Improve] accessToken check exists bug fixed.
* [Improve] "savepoint" word typo improvement
* [Improve] login minor improvement
* [Improve] e2e bug fixed.
* [Improve] JWT verify bug fixed.
* [Improve] jwt test bug fixed.
---
.../streampark/common/util/CURLBuilder.scala | 3 +-
.../streampark/console/base/util/WebUtils.java | 37 --------
.../{AppUpdated.java => AppChangeEvent.java} | 6 +-
.../console/core/annotation/OpenAPI.java | 1 +
.../console/core/aspect/AppChangeEventAspect.java | 52 +++++++++++
.../console/core/aspect/OpenAPIAspect.java | 104 +++++++++++++++++++++
...StreamParkAspect.java => PermissionAspect.java} | 54 +----------
.../core/component/FlinkCheckpointProcessor.java | 28 +++---
.../console/core/component/OpenAPIComponent.java | 15 ++-
.../core/controller/ApplicationController.java | 8 +-
.../console/core/controller/OpenAPIController.java | 23 ++---
.../console/core/controller/ProjectController.java | 4 +-
...intController.java => SavepointController.java} | 24 ++---
.../controller/SparkApplicationController.java | 10 +-
.../console/core/entity/Application.java | 6 +-
.../core/entity/{SavePoint.java => Savepoint.java} | 4 +-
.../console/core/entity/SparkApplication.java | 6 +-
.../{SavePointMapper.java => SavepointMapper.java} | 4 +-
...SavePointService.java => SavepointService.java} | 10 +-
.../impl/ApplicationActionServiceImpl.java | 76 ++++++++-------
.../impl/ApplicationInfoServiceImpl.java | 8 +-
.../impl/ApplicationManageServiceImpl.java | 6 +-
.../impl/SparkApplicationInfoServiceImpl.java | 4 +-
.../impl/SparkApplicationManageServiceImpl.java | 2 +-
...tServiceImpl.java => SavepointServiceImpl.java} | 84 ++++++++---------
.../console/core/watcher/FlinkAppHttpWatcher.java | 24 ++---
.../console/system/authentication/JWTFilter.java | 21 ++---
.../console/system/authentication/JWTToken.java | 5 +-
.../console/system/authentication/JWTUtil.java | 84 +++++++++++------
.../console/system/authentication/ShiroConfig.java | 1 +
.../console/system/authentication/ShiroRealm.java | 68 ++++++++------
.../system/controller/AccessTokenController.java | 5 +-
.../system/controller/PassportController.java | 27 ++----
.../console/system/controller/SsoController.java | 2 +-
.../console/system/controller/UserController.java | 3 +-
.../console/system/entity/AccessToken.java | 1 -
.../console/system/runner/StartedUpRunner.java | 1 +
.../console/system/security/Authenticator.java | 3 +-
.../system/security/impl/AuthenticatorImpl.java | 8 +-
.../console/system/service/AccessTokenService.java | 4 +-
.../console/system/service/UserService.java | 5 +-
.../service/impl/AccessTokenServiceImpl.java | 15 +--
.../system/service/impl/UserServiceImpl.java | 23 ++---
.../{SavePointMapper.xml => SavepointMapper.xml} | 4 +-
.../core/service/AccessTokenServiceTest.java | 15 ++-
.../core/service/ApplicationManageServiceTest.java | 4 +-
...tServiceTest.java => SavepointServiceTest.java} | 42 ++++-----
.../console/system/authentication/JWTTest.java | 29 +++---
.../streampark-console-webapp/src/api/flink/app.ts | 2 +-
.../src/api/flink/app.type.ts | 8 +-
.../src/locales/lang/en/flink/app.ts | 1 +
.../src/locales/lang/zh-CN/flink/app.ts | 1 +
.../components/AppView/StartApplicationModal.vue | 18 ++--
.../components/AppView/StopApplicationModal.vue | 16 ++--
.../src/views/flink/app/hooks/useSavepoint.tsx | 2 +-
.../pages/flink/applications/ApplicationsPage.java | 2 +-
.../flink/client/bean/CancelResponse.scala | 2 +-
.../flink/client/bean/SavepointResponse.scala | 2 +-
58 files changed, 566 insertions(+), 461 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/CURLBuilder.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/CURLBuilder.scala
index e35d92d3b..03bfb84c9 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/CURLBuilder.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/CURLBuilder.scala
@@ -44,8 +44,7 @@ class CURLBuilder(val url: String) {
headers.keySet.foreach(h => cURL.append(String.format("-H \'%s: %s\'
\\\n", h, headers.get(h))))
formData.foreach(k =>
cURL.append(String.format("--data-urlencode \'%s=%s\' \\\n", k._1,
k._2)))
- cURL.append("-i")
- cURL.toString
+ cURL.toString.trim.dropRight(1)
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java
index 731d8e5df..9a2a4a8de 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java
@@ -40,36 +40,6 @@ public final class WebUtils {
private WebUtils() {
}
- /**
- * token encrypt
- *
- * @param token token
- * @return encrypt token
- */
- public static String encryptToken(String token) {
- try {
- return EncryptUtils.encrypt(token);
- } catch (Exception e) {
- log.info("token encrypt failed: ", e);
- return null;
- }
- }
-
- /**
- * token decrypt
- *
- * @param encryptToken encryptToken
- * @return decrypt token
- */
- public static String decryptToken(String encryptToken) {
- try {
- return EncryptUtils.decrypt(encryptToken);
- } catch (Exception e) {
- log.info("token decrypt failed: ", e);
- return null;
- }
- }
-
/**
* camel to underscore
*
@@ -113,15 +83,8 @@ public final class WebUtils {
return getAppDir(LIB);
}
- public static File getAppPluginsDir() {
- return getAppDir(PLUGINS);
- }
-
public static File getAppClientDir() {
return getAppDir(CLIENT);
}
- public static File getAppConfDir() {
- return getAppDir(CONF);
- }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppChangeEvent.java
similarity index 88%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppChangeEvent.java
index 698b54e62..8e741cd03 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppUpdated.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/AppChangeEvent.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.core.annotation;
-import org.apache.streampark.console.core.aspect.StreamParkAspect;
+import org.apache.streampark.console.core.aspect.AppChangeEventAspect;
import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
import org.aspectj.lang.ProceedingJoinPoint;
@@ -31,12 +31,12 @@ import java.lang.annotation.Target;
* In the controller({@link org.apache.streampark.console.core.controller}),
If some method causes
* application state update, need to add this annotation, This annotation
marks which methods will
* cause the application to be updated, Will work together with {@link
- * StreamParkAspect#appUpdated(ProceedingJoinPoint)}, The final purpose will
be refresh {@link
+ * AppChangeEventAspect#appChangeEvent(ProceedingJoinPoint)}, The final
purpose will be refresh {@link
* FlinkAppHttpWatcher#WATCHING_APPS}, Make the state of the job consistent
with the database
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
-public @interface AppUpdated {
+public @interface AppChangeEvent {
boolean value() default true;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/OpenAPI.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/OpenAPI.java
index 8c039857e..6fbe29d44 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/OpenAPI.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/annotation/OpenAPI.java
@@ -45,6 +45,7 @@ public @interface OpenAPI {
String defaultValue() default "";
String bindFor() default "";
+
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/AppChangeEventAspect.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/AppChangeEventAspect.java
new file mode 100644
index 000000000..ef8adce65
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/AppChangeEventAspect.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.aspect;
+
+import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
+
+import lombok.extern.slf4j.Slf4j;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.annotation.Pointcut;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+@Aspect
+public class AppChangeEventAspect {
+
+ @Autowired
+ private FlinkAppHttpWatcher flinkAppHttpWatcher;
+
+
@Pointcut("@annotation(org.apache.streampark.console.core.annotation.AppChangeEvent)")
+ public void appChangeEventPointcut() {
+ }
+
+ @Around("appChangeEventPointcut()")
+ public Object appChangeEvent(ProceedingJoinPoint joinPoint) throws
Throwable {
+ MethodSignature methodSignature = (MethodSignature)
joinPoint.getSignature();
+ log.debug("appUpdated aspect, method:{}", methodSignature.getName());
+ Object target = joinPoint.proceed();
+ flinkAppHttpWatcher.init();
+ return target;
+ }
+
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/OpenAPIAspect.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/OpenAPIAspect.java
new file mode 100644
index 000000000..2438802f0
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/OpenAPIAspect.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.aspect;
+
+import org.apache.streampark.common.util.DateUtils;
+import org.apache.streampark.common.util.ReflectUtils;
+import org.apache.streampark.console.base.domain.RestResponse;
+import org.apache.streampark.console.base.exception.ApiAlertException;
+import org.apache.streampark.console.core.annotation.OpenAPI;
+import org.apache.streampark.console.system.entity.AccessToken;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shiro.SecurityUtils;
+
+import lombok.extern.slf4j.Slf4j;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.annotation.Pointcut;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.springframework.stereotype.Component;
+import org.springframework.web.context.request.RequestContextHolder;
+import org.springframework.web.context.request.ServletRequestAttributes;
+
+import javax.servlet.http.HttpServletRequest;
+
+import java.lang.reflect.Field;
+import java.util.Date;
+import java.util.TimeZone;
+
+@Slf4j
+@Component
+@Aspect
+public class OpenAPIAspect {
+
+ @Pointcut("execution(public"
+ + " org.apache.streampark.console.base.domain.RestResponse"
+ + " org.apache.streampark.console.core.controller.*.*(..))")
+ public void openAPIPointcut() {
+ }
+
+ @SuppressWarnings("checkstyle:SimplifyBooleanExpression")
+ @Around(value = "openAPIPointcut()")
+ public RestResponse openAPI(ProceedingJoinPoint joinPoint) throws
Throwable {
+ MethodSignature methodSignature = (MethodSignature)
joinPoint.getSignature();
+ log.debug("restResponse aspect, method:{}", methodSignature.getName());
+ Boolean isApi = (Boolean)
SecurityUtils.getSubject().getSession().getAttribute(AccessToken.IS_API_TOKEN);
+ if (isApi != null && isApi) {
+ HttpServletRequest request =
+ ((ServletRequestAttributes)
RequestContextHolder.getRequestAttributes()).getRequest();
+ OpenAPI openAPI =
methodSignature.getMethod().getAnnotation(OpenAPI.class);
+ if (openAPI == null) {
+ String url = request.getRequestURI();
+ throw new ApiAlertException("openapi unsupported: " + url);
+ } else {
+ Object[] objects = joinPoint.getArgs();
+ for (OpenAPI.Param param : openAPI.param()) {
+ String bingFor = param.bindFor();
+ if (StringUtils.isNotBlank(bingFor)) {
+ String name = param.name();
+ for (Object args : objects) {
+ Field bindForField =
ReflectUtils.getField(args.getClass(), bingFor);
+ if (bindForField != null) {
+ Object value = request.getParameter(name);
+ bindForField.setAccessible(true);
+ if (value != null) {
+ if (param.type().equals(String.class)) {
+ bindForField.set(args,
value.toString());
+ } else if
(param.type().equals(Boolean.class)
+ || param.type().equals(boolean.class))
{
+ bindForField.set(args,
Boolean.parseBoolean(value.toString()));
+ } else if
(param.type().equals(Integer.class) || param.type().equals(int.class)) {
+ bindForField.set(args,
Integer.parseInt(value.toString()));
+ } else if (param.type().equals(Long.class)
|| param.type().equals(long.class)) {
+ bindForField.set(args,
Long.parseLong(value.toString()));
+ } else if
(param.type().equals(Date.class)) {
+ bindForField.set(args,
DateUtils.parse(value.toString(), DateUtils.fullFormat(),
+ TimeZone.getDefault()));
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ return (RestResponse) joinPoint.proceed();
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/PermissionAspect.java
similarity index 71%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/PermissionAspect.java
index dda162627..add5f5e01 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/StreamParkAspect.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/PermissionAspect.java
@@ -19,20 +19,16 @@ package org.apache.streampark.console.core.aspect;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.exception.ApiAlertException;
-import org.apache.streampark.console.core.annotation.OpenAPI;
import org.apache.streampark.console.core.annotation.Permission;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.enums.UserTypeEnum;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.streampark.console.core.util.ServiceHelper;
-import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
-import org.apache.streampark.console.system.entity.AccessToken;
import org.apache.streampark.console.system.entity.Member;
import org.apache.streampark.console.system.entity.User;
import org.apache.streampark.console.system.service.MemberService;
import org.apache.commons.lang3.StringUtils;
-import org.apache.shiro.SecurityUtils;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
@@ -47,18 +43,11 @@ import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Component;
-import org.springframework.web.context.request.RequestContextHolder;
-import org.springframework.web.context.request.ServletRequestAttributes;
-
-import javax.servlet.http.HttpServletRequest;
@Slf4j
@Component
@Aspect
-public class StreamParkAspect {
-
- @Autowired
- private FlinkAppHttpWatcher flinkAppHttpWatcher;
+public class PermissionAspect {
@Autowired
private MemberService memberService;
@@ -66,48 +55,11 @@ public class StreamParkAspect {
@Autowired
private ApplicationManageService applicationManageService;
- @Pointcut("execution(public"
- + " org.apache.streampark.console.base.domain.RestResponse"
- + " org.apache.streampark.console.core.controller.*.*(..))")
- public void openAPI() {
- }
-
- @SuppressWarnings("checkstyle:SimplifyBooleanExpression")
- @Around(value = "openAPI()")
- public RestResponse openAPI(ProceedingJoinPoint joinPoint) throws
Throwable {
- MethodSignature methodSignature = (MethodSignature)
joinPoint.getSignature();
- log.debug("restResponse aspect, method:{}", methodSignature.getName());
- Boolean isApi = (Boolean)
SecurityUtils.getSubject().getSession().getAttribute(AccessToken.IS_API_TOKEN);
- if (isApi != null && isApi) {
- OpenAPI openAPI =
methodSignature.getMethod().getAnnotation(OpenAPI.class);
- if (openAPI == null) {
- HttpServletRequest request =
- ((ServletRequestAttributes)
RequestContextHolder.getRequestAttributes()).getRequest();
- String url = request.getRequestURI();
- throw new ApiAlertException("openapi unsupported: " + url);
- }
- }
- return (RestResponse) joinPoint.proceed();
- }
-
-
@Pointcut("@annotation(org.apache.streampark.console.core.annotation.AppUpdated)")
- public void appUpdated() {
- }
-
- @Around("appUpdated()")
- public Object appUpdated(ProceedingJoinPoint joinPoint) throws Throwable {
- MethodSignature methodSignature = (MethodSignature)
joinPoint.getSignature();
- log.debug("appUpdated aspect, method:{}", methodSignature.getName());
- Object target = joinPoint.proceed();
- flinkAppHttpWatcher.init();
- return target;
- }
-
@Pointcut("@annotation(org.apache.streampark.console.core.annotation.Permission)")
- public void permissionAction() {
+ public void permissionPointcut() {
}
- @Around("permissionAction()")
+ @Around("permissionPointcut()")
public RestResponse permissionAction(ProceedingJoinPoint joinPoint) throws
Throwable {
MethodSignature methodSignature = (MethodSignature)
joinPoint.getSignature();
Permission permission =
methodSignature.getMethod().getAnnotation(Permission.class);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
index 8bad65b90..5363c2ede 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
@@ -19,11 +19,11 @@ package org.apache.streampark.console.core.component;
import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.console.core.entity.Application;
-import org.apache.streampark.console.core.entity.SavePoint;
+import org.apache.streampark.console.core.entity.Savepoint;
import org.apache.streampark.console.core.enums.CheckPointStatusEnum;
import org.apache.streampark.console.core.enums.FailoverStrategyEnum;
import org.apache.streampark.console.core.metrics.flink.CheckPoints;
-import org.apache.streampark.console.core.service.SavePointService;
+import org.apache.streampark.console.core.service.SavepointService;
import org.apache.streampark.console.core.service.alert.AlertService;
import
org.apache.streampark.console.core.service.application.ApplicationActionService;
import org.apache.streampark.console.core.utils.AlertTemplateUtils;
@@ -73,7 +73,7 @@ public class FlinkCheckpointProcessor {
private AlertService alertService;
@Autowired
- private SavePointService savePointService;
+ private SavepointService savepointService;
@Autowired
private FlinkAppHttpWatcher flinkAppHttpWatcher;
@@ -172,8 +172,8 @@ public class FlinkCheckpointProcessor {
return checkPointCache.get(
cacheId,
key -> {
- SavePoint savePoint = savePointService.getLatest(appId);
- return
Optional.ofNullable(savePoint).map(SavePoint::getChkId).orElse(null);
+ Savepoint savepoint = savepointService.getLatest(appId);
+ return
Optional.ofNullable(savepoint).map(Savepoint::getChkId).orElse(null);
});
}
@@ -184,15 +184,15 @@ public class FlinkCheckpointProcessor {
}
private void saveSavepoint(CheckPoints.CheckPoint checkPoint, Long appId) {
- SavePoint savePoint = new SavePoint();
- savePoint.setAppId(appId);
- savePoint.setChkId(checkPoint.getId());
- savePoint.setLatest(true);
- savePoint.setType(checkPoint.getCheckPointType().get());
- savePoint.setPath(checkPoint.getExternalPath());
- savePoint.setTriggerTime(new Date(checkPoint.getTriggerTimestamp()));
- savePoint.setCreateTime(new Date());
- savePointService.save(savePoint);
+ Savepoint savepoint = new Savepoint();
+ savepoint.setAppId(appId);
+ savepoint.setChkId(checkPoint.getId());
+ savepoint.setLatest(true);
+ savepoint.setType(checkPoint.getCheckPointType().get());
+ savepoint.setPath(checkPoint.getExternalPath());
+ savepoint.setTriggerTime(new Date(checkPoint.getTriggerTimestamp()));
+ savepoint.setCreateTime(new Date());
+ savepointService.save(savepoint);
}
public static class Counter {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/OpenAPIComponent.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/OpenAPIComponent.java
index b9507ebb4..75f43a7af 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/OpenAPIComponent.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/OpenAPIComponent.java
@@ -67,7 +67,7 @@ public class OpenAPIComponent {
return schemas.get(name);
}
- public String getOpenApiCUrl(String baseUrl, Long appId, Long teamId,
String name) {
+ public String getOpenApiCUrl(String name, String baseUrl, Long appId, Long
teamId) {
OpenAPISchema schema = this.getOpenAPISchema(name);
if (schema == null) {
throw new UnsupportedOperationException("Unsupported OpenAPI: " +
name);
@@ -86,10 +86,15 @@ public class OpenAPIComponent {
schema.getSchema().forEach(c -> {
if (c.isRequired()) {
- if ("appId".equals(c.getBindFor())) {
- curlBuilder.addFormData(c.getName(), appId);
- } else if ("teamId".equals(c.getBindFor())) {
- curlBuilder.addFormData(c.getName(), teamId);
+ switch (c.getBindFor()) {
+ case "appId":
+ curlBuilder.addFormData(c.getName(), appId);
+ break;
+ case "teamId":
+ curlBuilder.addFormData(c.getName(), teamId);
+ break;
+ default:
+ break;
}
} else {
curlBuilder.addFormData(c.getName(), c.getDefaultValue());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
index 722b34d8c..6159063e7 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
@@ -22,7 +22,7 @@ import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.exception.InternalException;
-import org.apache.streampark.console.core.annotation.AppUpdated;
+import org.apache.streampark.console.core.annotation.AppChangeEvent;
import org.apache.streampark.console.core.annotation.Permission;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.ApplicationBackUp;
@@ -100,7 +100,7 @@ public class ApplicationController {
return RestResponse.success();
}
- @AppUpdated
+ @AppChangeEvent
@Permission(app = "#app.id")
@PostMapping("update")
@RequiresPermissions("app:update")
@@ -124,7 +124,7 @@ public class ApplicationController {
return RestResponse.success(applicationList);
}
- @AppUpdated
+ @AppChangeEvent
@PostMapping("mapping")
@Permission(app = "#app.id")
@RequiresPermissions("app:mapping")
@@ -133,7 +133,7 @@ public class ApplicationController {
return RestResponse.success(flag);
}
- @AppUpdated
+ @AppChangeEvent
@Permission(app = "#app.id")
@PostMapping("revoke")
@RequiresPermissions("app:release")
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/OpenAPIController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/OpenAPIController.java
index 75954f041..0959004a0 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/OpenAPIController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/OpenAPIController.java
@@ -52,9 +52,10 @@ public class OpenAPIController {
}, param = {
@OpenAPI.Param(name = "id", description = "current flink
application id", required = true, type = Long.class, bindFor = "appId"),
@OpenAPI.Param(name = "teamId", description = "current user
teamId", required = true, type = Long.class),
- @OpenAPI.Param(name = "savePointed", description = "restored app
from the savepoint or latest checkpoint", required = false, type =
String.class, defaultValue = "false"),
- @OpenAPI.Param(name = "savePoint", description = "savepoint or
checkpoint path", required = false, type = String.class),
- @OpenAPI.Param(name = "allowNonRestored", description = "ignore
savepoint if cannot be restored", required = false, type = boolean.class,
defaultValue = "false")
+ @OpenAPI.Param(name = "argument", description = "flink program run
arguments", required = false, type = String.class, bindFor = "args"),
+ @OpenAPI.Param(name = "restoreFromSavepoint", description =
"restored app from the savepoint or checkpoint", required = false, type =
Boolean.class, defaultValue = "false", bindFor = "restoreOrTriggerSavepoint"),
+ @OpenAPI.Param(name = "savepointPath", description = "savepoint or
checkpoint path", required = false, type = String.class),
+ @OpenAPI.Param(name = "allowNonRestored", description = "ignore
savepoint if cannot be restored", required = false, type = Boolean.class,
defaultValue = "false"),
})
@Permission(app = "#app.appId", team = "#app.teamId")
@PostMapping("app/start")
@@ -69,9 +70,9 @@ public class OpenAPIController {
}, param = {
@OpenAPI.Param(name = "id", description = "current flink
application id", required = true, type = Long.class, bindFor = "appId"),
@OpenAPI.Param(name = "teamId", description = "current user
teamId", required = true, type = Long.class),
- @OpenAPI.Param(name = "savePointed", description = "trigger
savepoint before taking stopping", required = false, type = boolean.class,
defaultValue = "false"),
- @OpenAPI.Param(name = "savePoint", description = "savepoint path",
required = false, type = String.class),
- @OpenAPI.Param(name = "drain", description = "send max watermark
before canceling", required = false, type = boolean.class, defaultValue =
"false"),
+ @OpenAPI.Param(name = "triggerSavepoint", description = "trigger
savepoint before taking stopping", required = false, type = Boolean.class,
defaultValue = "false", bindFor = "restoreOrTriggerSavepoint"),
+ @OpenAPI.Param(name = "savepointPath", description = "savepoint
path", required = false, type = String.class),
+ @OpenAPI.Param(name = "drain", description = "send max watermark
before canceling", required = false, type = Boolean.class, defaultValue =
"false"),
})
@Permission(app = "#app.appId", team = "#app.teamId")
@PostMapping("app/cancel")
@@ -82,11 +83,11 @@ public class OpenAPIController {
}
@PostMapping("curl")
- public RestResponse copyOpenApiCurl(String baseUrl,
- Long appId,
- @NotNull(message = "{required}") Long
teamId,
- @NotBlank(message = "{required}")
String name) {
- String url = openAPIComponent.getOpenApiCUrl(baseUrl, appId, teamId,
name);
+ public RestResponse copyOpenApiCurl(String name,
+ String baseUrl,
+ @NotNull Long appId,
+ @NotNull Long teamId) {
+ String url = openAPIComponent.getOpenApiCUrl(name, baseUrl, appId,
teamId);
return RestResponse.success(url);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProjectController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProjectController.java
index 72b1df4c3..50b33a7a1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProjectController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProjectController.java
@@ -20,7 +20,7 @@ package org.apache.streampark.console.core.controller;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.exception.ApiAlertException;
-import org.apache.streampark.console.core.annotation.AppUpdated;
+import org.apache.streampark.console.core.annotation.AppChangeEvent;
import org.apache.streampark.console.core.annotation.Permission;
import org.apache.streampark.console.core.entity.Project;
import org.apache.streampark.console.core.enums.GitAuthorizedErrorEnum;
@@ -59,7 +59,7 @@ public class ProjectController {
return projectService.create(project);
}
- @AppUpdated
+ @AppChangeEvent
@PostMapping("update")
@RequiresPermissions("project:update")
@Permission(team = "#project.teamId")
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavePointController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavepointController.java
similarity index 79%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavePointController.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavepointController.java
index 21d3862f9..85902e3c3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavePointController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SavepointController.java
@@ -22,8 +22,8 @@ import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.exception.InternalException;
import org.apache.streampark.console.core.annotation.Permission;
import org.apache.streampark.console.core.entity.Application;
-import org.apache.streampark.console.core.entity.SavePoint;
-import org.apache.streampark.console.core.service.SavePointService;
+import org.apache.streampark.console.core.entity.Savepoint;
+import org.apache.streampark.console.core.service.SavepointService;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.shiro.authz.annotation.RequiresPermissions;
@@ -42,37 +42,37 @@ import javax.annotation.Nullable;
@Validated
@RestController
@RequestMapping("flink/savepoint")
-public class SavePointController {
+public class SavepointController {
@Autowired
private ApplicationManageService applicationManageService;
@Autowired
- private SavePointService savePointService;
+ private SavepointService savepointService;
@PostMapping("history")
@Permission(app = "#sp.appId", team = "#sp.teamId")
- public RestResponse history(SavePoint sp, RestRequest request) {
- IPage<SavePoint> page = savePointService.getPage(sp, request);
+ public RestResponse history(Savepoint sp, RestRequest request) {
+ IPage<Savepoint> page = savepointService.getPage(sp, request);
return RestResponse.success(page);
}
@PostMapping("delete")
@RequiresPermissions("savepoint:delete")
@Permission(app = "#sp.appId", team = "#sp.teamId")
- public RestResponse delete(SavePoint sp) throws InternalException {
- SavePoint savePoint = savePointService.getById(sp.getId());
- Application application =
applicationManageService.getById(savePoint.getAppId());
- Boolean deleted = savePointService.remove(sp.getId(), application);
+ public RestResponse delete(Savepoint sp) throws InternalException {
+ Savepoint savepoint = savepointService.getById(sp.getId());
+ Application application =
applicationManageService.getById(savepoint.getAppId());
+ Boolean deleted = savepointService.remove(sp.getId(), application);
return RestResponse.success(deleted);
}
@PostMapping("trigger")
- @Permission(app = "#savePoint.appId", team = "#savePoint.teamId")
+ @Permission(app = "#savepoint.appId", team = "#savepoint.teamId")
@RequiresPermissions("savepoint:trigger")
public RestResponse trigger(
Long appId, @Nullable String savepointPath,
@Nullable Boolean nativeFormat) {
- savePointService.trigger(appId, savepointPath, nativeFormat);
+ savepointService.trigger(appId, savepointPath, nativeFormat);
return RestResponse.success(true);
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java
index 5f2165b4a..861779ca3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java
@@ -22,7 +22,7 @@ import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.exception.InternalException;
-import org.apache.streampark.console.core.annotation.AppUpdated;
+import org.apache.streampark.console.core.annotation.AppChangeEvent;
import org.apache.streampark.console.core.entity.ApplicationBackUp;
import org.apache.streampark.console.core.entity.SparkApplication;
import org.apache.streampark.console.core.entity.SparkApplicationLog;
@@ -96,7 +96,7 @@ public class SparkApplicationController {
return RestResponse.success();
}
- @AppUpdated
+ @AppChangeEvent
@PostMapping("update")
@RequiresPermissions("app:update")
public RestResponse update(SparkApplication app) {
@@ -117,7 +117,7 @@ public class SparkApplicationController {
return RestResponse.success(applicationList);
}
- @AppUpdated
+ @AppChangeEvent
@PostMapping("mapping")
@RequiresPermissions("app:mapping")
public RestResponse mapping(SparkApplication app) {
@@ -125,7 +125,7 @@ public class SparkApplicationController {
return RestResponse.success(flag);
}
- @AppUpdated
+ @AppChangeEvent
@PostMapping("revoke")
@RequiresPermissions("app:release")
public RestResponse revoke(SparkApplication app) {
@@ -158,7 +158,7 @@ public class SparkApplicationController {
return RestResponse.success();
}
- @AppUpdated
+ @AppChangeEvent
@PostMapping("clean")
@RequiresPermissions("app:clean")
public RestResponse clean(SparkApplication app) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index d9d721262..f277d734e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -238,11 +238,11 @@ public class Application implements Serializable {
private transient String flinkVersion;
private transient String confPath;
private transient Integer format;
- private transient String savePoint;
- private transient Boolean savePointed = false;
+ private transient String savepointPath;
+ private transient Boolean restoreOrTriggerSavepoint = false;
private transient Boolean drain = false;
private transient Boolean nativeFormat = false;
- private transient Long savePointTimeout = 60L;
+ private transient Long savepointTimeout = 60L;
private transient Boolean allowNonRestored = false;
private transient Integer restoreMode;
private transient String socketId;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SavePoint.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Savepoint.java
similarity index 96%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SavePoint.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Savepoint.java
index 34ad48ad1..4b78af0e6 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SavePoint.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Savepoint.java
@@ -28,7 +28,7 @@ import java.util.Date;
@Data
@TableName("t_flink_savepoint")
@Slf4j
-public class SavePoint {
+public class Savepoint {
@TableId(type = IdType.AUTO)
private Long id;
@@ -41,7 +41,7 @@ public class SavePoint {
/**
* 1) checkPoint <br>
- * 2) savePoint
+ * 2) savepoint
*/
private Integer type;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
index 0ed01fcd4..8dad101fb 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
@@ -229,11 +229,11 @@ public class SparkApplication extends BaseEntity {
private transient String sparkVersion;
private transient String confPath;
private transient Integer format;
- private transient String savePoint;
- private transient Boolean savePointed = false;
+ private transient String savepointPath;
+ private transient Boolean restoreOrTriggerSavepoint = false;
private transient Boolean drain = false;
private transient Boolean nativeFormat = false;
- private transient Long savePointTimeout = 60L;
+ private transient Long savepointTimeout = 60L;
private transient Boolean allowNonRestored = false;
private transient Integer restoreMode;
private transient String socketId;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavePointMapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavepointMapper.java
similarity index 88%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavePointMapper.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavepointMapper.java
index 53714e9c8..7040ec574 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavePointMapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavepointMapper.java
@@ -17,9 +17,9 @@
package org.apache.streampark.console.core.mapper;
-import org.apache.streampark.console.core.entity.SavePoint;
+import org.apache.streampark.console.core.entity.Savepoint;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-public interface SavePointMapper extends BaseMapper<SavePoint> {
+public interface SavepointMapper extends BaseMapper<Savepoint> {
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
similarity index 92%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
index 3982d6dbe..d7597817a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
@@ -20,14 +20,14 @@ package org.apache.streampark.console.core.service;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.InternalException;
import org.apache.streampark.console.core.entity.Application;
-import org.apache.streampark.console.core.entity.SavePoint;
+import org.apache.streampark.console.core.entity.Savepoint;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService;
import javax.annotation.Nullable;
-public interface SavePointService extends IService<SavePoint> {
+public interface SavepointService extends IService<Savepoint> {
/**
* Expires all savepoints for the specified application.
@@ -42,7 +42,7 @@ public interface SavePointService extends IService<SavePoint>
{
* @param id the unique identifier of the SavePoint
* @return the latest SavePoint object, or null if not found
*/
- SavePoint getLatest(Long id);
+ Savepoint getLatest(Long id);
/**
* Triggers a savepoint for the specified application.
@@ -67,11 +67,11 @@ public interface SavePointService extends
IService<SavePoint> {
/**
* Retrieves a page of savepoint objects based on the specified parameters.
*
- * @param savePoint The SavePoint object to be used for filtering the page
results.
+ * @param savepoint The SavePoint object to be used for filtering the page
results.
* @param request The RestRequest object containing additional request
parameters.
* @return An instance of IPage<SavePoint> representing the page of
SavePoint objects.
*/
- IPage<SavePoint> getPage(SavePoint savePoint, RestRequest request);
+ IPage<Savepoint> getPage(Savepoint savepoint, RestRequest request);
/**
* Removes all savepoints for the specified application.
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 525166be7..e19976dd4 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -45,7 +45,7 @@ import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.entity.FlinkSql;
import org.apache.streampark.console.core.entity.Resource;
-import org.apache.streampark.console.core.entity.SavePoint;
+import org.apache.streampark.console.core.entity.Savepoint;
import org.apache.streampark.console.core.enums.CheckPointTypeEnum;
import org.apache.streampark.console.core.enums.ConfigFileTypeEnum;
import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
@@ -61,7 +61,7 @@ import
org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.FlinkSqlService;
import org.apache.streampark.console.core.service.ResourceService;
-import org.apache.streampark.console.core.service.SavePointService;
+import org.apache.streampark.console.core.service.SavepointService;
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.VariableService;
import
org.apache.streampark.console.core.service.application.ApplicationActionService;
@@ -155,7 +155,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
private FlinkSqlService flinkSqlService;
@Autowired
- private SavePointService savePointService;
+ private SavepointService savepointService;
@Autowired
private SettingService settingService;
@@ -251,7 +251,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
applicationLog.setYarnAppId(application.getClusterId());
applicationLog.setUserId(ServiceHelper.getUserId());
- if (appParam.getSavePointed()) {
+ if (appParam.getRestoreOrTriggerSavepoint()) {
FlinkAppHttpWatcher.addSavepoint(application.getId());
application.setOptionState(OptionStateEnum.SAVEPOINTING.getValue());
} else {
@@ -270,10 +270,10 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
// infer savepoint
String customSavepoint = null;
- if (appParam.getSavePointed()) {
- customSavepoint = appParam.getSavePoint();
+ if (appParam.getRestoreOrTriggerSavepoint()) {
+ customSavepoint = appParam.getSavepointPath();
if (StringUtils.isBlank(customSavepoint)) {
- customSavepoint = savePointService.getSavePointPath(appParam);
+ customSavepoint = savepointService.getSavePointPath(appParam);
}
}
@@ -303,7 +303,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
properties,
clusterId,
application.getJobId(),
- appParam.getSavePointed(),
+ appParam.getRestoreOrTriggerSavepoint(),
appParam.getDrain(),
customSavepoint,
appParam.getNativeFormat(),
@@ -333,8 +333,8 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
application.setState(FlinkAppStateEnum.FAILED.getValue());
updateById(application);
- if (appParam.getSavePointed()) {
- savePointService.expire(application.getId());
+ if (appParam.getRestoreOrTriggerSavepoint()) {
+ savepointService.expire(application.getId());
}
// re-tracking flink job on kubernetes and logging
exception
if (application.isKubernetesModeJob()) {
@@ -352,17 +352,17 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
// save log...
applicationLogService.save(applicationLog);
- if (cancelResponse != null && cancelResponse.savePointDir() !=
null) {
- String savePointDir = cancelResponse.savePointDir();
- log.info("savePoint path: {}", savePointDir);
- SavePoint savePoint = new SavePoint();
- savePoint.setPath(savePointDir);
- savePoint.setAppId(application.getId());
- savePoint.setLatest(true);
- savePoint.setType(CheckPointTypeEnum.SAVEPOINT.get());
- savePoint.setCreateTime(new Date());
- savePoint.setTriggerTime(triggerTime);
- savePointService.save(savePoint);
+ if (cancelResponse != null && cancelResponse.savepointDir() !=
null) {
+ String savepointDir = cancelResponse.savepointDir();
+ log.info("savepoint path: {}", savepointDir);
+ Savepoint savepoint = new Savepoint();
+ savepoint.setPath(savepointDir);
+ savepoint.setAppId(application.getId());
+ savepoint.setLatest(true);
+ savepoint.setType(CheckPointTypeEnum.SAVEPOINT.get());
+ savepoint.setCreateTime(new Date());
+ savepoint.setTriggerTime(triggerTime);
+ savepointService.save(savepoint);
}
if (application.isKubernetesModeJob()) {
@@ -403,7 +403,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
if (!application.isNeedRestartOnFailed()) {
return;
}
- appParam.setSavePointed(true);
+ appParam.setRestoreOrTriggerSavepoint(true);
application.setRestartCount(application.getRestartCount() + 1);
}
// 2) update app state to starting...
@@ -431,17 +431,22 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
}
// Get the args after placeholder replacement
- String applicationArgs =
variableService.replaceVariable(application.getTeamId(), application.getArgs());
+ String args = StringUtils.isBlank(appParam.getArgs()) ?
application.getArgs() : appParam.getArgs();
+ String applicationArgs =
variableService.replaceVariable(application.getTeamId(), args);
Tuple3<String, String, FlinkK8sRestExposedType> clusterIdNamespace =
getNamespaceClusterId(application);
String k8sNamespace = clusterIdNamespace.t1;
String k8sClusterId = clusterIdNamespace.t2;
FlinkK8sRestExposedType exposedType = clusterIdNamespace.t3;
+ String dynamicProperties =
+ StringUtils.isBlank(appParam.getDynamicProperties()) ?
application.getDynamicProperties()
+ : appParam.getDynamicProperties();
+
SubmitRequest submitRequest = new SubmitRequest(
flinkEnv.getFlinkVersion(),
FlinkExecutionMode.of(application.getExecutionMode()),
- getProperties(application),
+ getProperties(application, dynamicProperties),
flinkEnv.getFlinkConf(),
FlinkDevelopmentMode.of(application.getJobType()),
application.getId(),
@@ -449,7 +454,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
application.getJobName(),
appConf,
application.getApplicationType(),
- getSavePointed(appParam),
+ getSavepointPath(appParam),
FlinkRestoreMode.of(appParam.getRestoreMode()),
applicationArgs,
k8sClusterId,
@@ -713,7 +718,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
return Tuple2.of(flinkUserJar, appConf);
}
- private Map<String, Object> getProperties(Application application) {
+ private Map<String, Object> getProperties(Application application, String
runtimeProperties) {
Map<String, Object> properties = new
HashMap<>(application.getOptionMap());
if
(FlinkExecutionMode.isRemoteMode(application.getFlinkExecutionMode())) {
FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
@@ -756,8 +761,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
properties.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(),
true);
}
- Map<String, String> dynamicProperties = PropertiesUtils
-
.extractDynamicPropertiesAsJava(application.getDynamicProperties());
+ Map<String, String> dynamicProperties =
PropertiesUtils.extractDynamicPropertiesAsJava(runtimeProperties);
properties.putAll(dynamicProperties);
ResolveOrder resolveOrder =
ResolveOrder.of(application.getResolveOrder());
if (resolveOrder != null) {
@@ -773,7 +777,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
application.setState(FlinkAppStateEnum.CANCELED.getValue());
application.setOptionTime(new Date());
updateById(application);
- savePointService.expire(application.getId());
+ savepointService.expire(application.getId());
// re-tracking flink job on kubernetes and logging exception
if (application.isKubernetesModeJob()) {
TrackId trackId = k8sWatcherWrapper.toTrackId(application);
@@ -797,15 +801,15 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
}
}
- private String getSavePointed(Application appParam) {
- if (appParam.getSavePointed()) {
- if (StringUtils.isBlank(appParam.getSavePoint())) {
- SavePoint savePoint =
savePointService.getLatest(appParam.getId());
- if (savePoint != null) {
- return savePoint.getPath();
+ private String getSavepointPath(Application appParam) {
+ if (appParam.getRestoreOrTriggerSavepoint()) {
+ if (StringUtils.isBlank(appParam.getSavepointPath())) {
+ Savepoint savepoint =
savepointService.getLatest(appParam.getId());
+ if (savepoint != null) {
+ return savepoint.getPath();
}
} else {
- return appParam.getSavePoint();
+ return appParam.getSavepointPath();
}
}
return null;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index 6712318b6..4b87659f5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -41,7 +41,7 @@ import
org.apache.streampark.console.core.metrics.flink.JobsOverview;
import org.apache.streampark.console.core.runner.EnvInitializer;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
-import org.apache.streampark.console.core.service.SavePointService;
+import org.apache.streampark.console.core.service.SavepointService;
import
org.apache.streampark.console.core.service.application.ApplicationInfoService;
import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
import org.apache.streampark.console.core.watcher.FlinkClusterWatcher;
@@ -102,7 +102,7 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
private FlinkEnvService flinkEnvService;
@Autowired
- private SavePointService savePointService;
+ private SavepointService savepointService;
@Autowired
private EnvInitializer envInitializer;
@@ -495,9 +495,9 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
@Override
public String checkSavepointPath(Application appParam) throws Exception {
- String savepointPath = appParam.getSavePoint();
+ String savepointPath = appParam.getSavepointPath();
if (StringUtils.isBlank(savepointPath)) {
- savepointPath = savePointService.getSavePointPath(appParam);
+ savepointPath = savepointService.getSavePointPath(appParam);
}
if (StringUtils.isNotBlank(savepointPath)) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index 1be8e5ea1..25cdf6344 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -49,7 +49,7 @@ import
org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkSqlService;
import org.apache.streampark.console.core.service.ProjectService;
import org.apache.streampark.console.core.service.ResourceService;
-import org.apache.streampark.console.core.service.SavePointService;
+import org.apache.streampark.console.core.service.SavepointService;
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.YarnQueueService;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
@@ -117,7 +117,7 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
private FlinkSqlService flinkSqlService;
@Autowired
- private SavePointService savePointService;
+ private SavepointService savepointService;
@Autowired
private EffectiveService effectiveService;
@@ -208,7 +208,7 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
backUpService.remove(application);
// 6) remove savepoint
- savePointService.remove(application);
+ savepointService.remove(application);
// 7) remove BuildPipeline
appBuildPipeService.removeByAppId(application.getId());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationInfoServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationInfoServiceImpl.java
index 4d4aba24e..01300ad6f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationInfoServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationInfoServiceImpl.java
@@ -305,9 +305,9 @@ public class SparkApplicationInfoServiceImpl
@Override
public String checkSavepointPath(SparkApplication appParam) throws
Exception {
- String savepointPath = appParam.getSavePoint();
+ String savepointPath = appParam.getSavepointPath();
if (StringUtils.isBlank(savepointPath)) {
- // savepointPath = savePointService.getSavePointPath(appParam);
+ // savepointPath = savepointService.getSavePointPath(appParam);
}
if (StringUtils.isNotBlank(savepointPath)) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
index 98a45b681..28c4bfc0b 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
@@ -179,7 +179,7 @@ public class SparkApplicationManageServiceImpl
// backUpService.remove(application);
// 6) remove savepoint
- // savePointService.remove(application);
+ // savepointService.remove(application);
// 7) remove BuildPipeline
appBuildPipeService.removeByAppId(application.getId());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
similarity index 88%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
index ec86160b2..af0a3a36a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
@@ -30,16 +30,16 @@ import
org.apache.streampark.console.core.entity.ApplicationConfig;
import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.FlinkEnv;
-import org.apache.streampark.console.core.entity.SavePoint;
+import org.apache.streampark.console.core.entity.Savepoint;
import org.apache.streampark.console.core.enums.CheckPointTypeEnum;
import org.apache.streampark.console.core.enums.OperationEnum;
import org.apache.streampark.console.core.enums.OptionStateEnum;
-import org.apache.streampark.console.core.mapper.SavePointMapper;
+import org.apache.streampark.console.core.mapper.SavepointMapper;
import org.apache.streampark.console.core.service.ApplicationConfigService;
import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
-import org.apache.streampark.console.core.service.SavePointService;
+import org.apache.streampark.console.core.service.SavepointService;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.streampark.console.core.util.ServiceHelper;
import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
@@ -86,9 +86,9 @@ import static
org.apache.streampark.console.core.enums.CheckPointTypeEnum.CHECKP
@Slf4j
@Service
@Transactional(propagation = Propagation.SUPPORTS, readOnly = true,
rollbackFor = Exception.class)
-public class SavePointServiceImpl extends ServiceImpl<SavePointMapper,
SavePoint>
+public class SavepointServiceImpl extends ServiceImpl<SavepointMapper,
Savepoint>
implements
- SavePointService {
+ SavepointService {
@Autowired
private FlinkEnvService flinkEnvService;
@@ -114,24 +114,24 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
@Override
public void expire(Long appId) {
- SavePoint savePoint = new SavePoint();
- savePoint.setLatest(false);
- LambdaQueryWrapper<SavePoint> queryWrapper = new
LambdaQueryWrapper<SavePoint>().eq(SavePoint::getAppId, appId);
- this.update(savePoint, queryWrapper);
+ Savepoint savepoint = new Savepoint();
+ savepoint.setLatest(false);
+ LambdaQueryWrapper<Savepoint> queryWrapper = new
LambdaQueryWrapper<Savepoint>().eq(Savepoint::getAppId, appId);
+ this.update(savepoint, queryWrapper);
}
@Override
- public boolean save(SavePoint entity) {
+ public boolean save(Savepoint entity) {
this.expire(entity);
this.expire(entity.getAppId());
return super.save(entity);
}
@Override
- public SavePoint getLatest(Long id) {
- LambdaQueryWrapper<SavePoint> queryWrapper = new
LambdaQueryWrapper<SavePoint>()
- .eq(SavePoint::getAppId, id)
- .eq(SavePoint::getLatest, true);
+ public Savepoint getLatest(Long id) {
+ LambdaQueryWrapper<Savepoint> queryWrapper = new
LambdaQueryWrapper<Savepoint>()
+ .eq(Savepoint::getAppId, id)
+ .eq(Savepoint::getLatest, true);
return this.getOne(queryWrapper);
}
@@ -199,10 +199,10 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
@Override
public Boolean remove(Long id, Application appParam) throws
InternalException {
- SavePoint savePoint = getById(id);
+ Savepoint savepoint = getById(id);
try {
- if (StringUtils.isNotBlank(savePoint.getPath())) {
- appParam.getFsOperator().delete(savePoint.getPath());
+ if (StringUtils.isNotBlank(savepoint.getPath())) {
+ appParam.getFsOperator().delete(savepoint.getPath());
}
return removeById(id);
} catch (Exception e) {
@@ -211,11 +211,11 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
}
@Override
- public IPage<SavePoint> getPage(SavePoint savePoint, RestRequest request) {
+ public IPage<Savepoint> getPage(Savepoint savepoint, RestRequest request) {
request.setSortField("trigger_time");
- Page<SavePoint> page = MybatisPager.getPage(request);
- LambdaQueryWrapper<SavePoint> queryWrapper = new
LambdaQueryWrapper<SavePoint>().eq(SavePoint::getAppId,
- savePoint.getAppId());
+ Page<Savepoint> page = MybatisPager.getPage(request);
+ LambdaQueryWrapper<Savepoint> queryWrapper = new
LambdaQueryWrapper<Savepoint>().eq(Savepoint::getAppId,
+ savepoint.getAppId());
return this.page(page, queryWrapper);
}
@@ -223,7 +223,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
public void remove(Application appParam) {
Long appId = appParam.getId();
- LambdaQueryWrapper<SavePoint> queryWrapper = new
LambdaQueryWrapper<SavePoint>().eq(SavePoint::getAppId, appId);
+ LambdaQueryWrapper<Savepoint> queryWrapper = new
LambdaQueryWrapper<Savepoint>().eq(Savepoint::getAppId, appId);
this.remove(queryWrapper);
try {
@@ -246,10 +246,10 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
10L,
TimeUnit.MINUTES,
savepointResponse -> {
- if (savepointResponse != null &&
savepointResponse.savePointDir() != null) {
+ if (savepointResponse != null &&
savepointResponse.savepointDir() != null) {
applicationLog.setSuccess(true);
- String savePointDir = savepointResponse.savePointDir();
- log.info("Request savepoint successful, savepointDir: {}",
savePointDir);
+ String savepointDir = savepointResponse.savepointDir();
+ log.info("Request savepoint successful, savepointDir: {}",
savepointDir);
}
},
e -> {
@@ -437,7 +437,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
return MAX_RETAINED_CHECKPOINTS.defaultValue();
}
- private void expire(SavePoint entity) {
+ private void expire(Savepoint entity) {
FlinkEnv flinkEnv = flinkEnvService.getByAppId(entity.getAppId());
Application application =
applicationManageService.getById(entity.getAppId());
AssertUtils.notNull(flinkEnv);
@@ -448,29 +448,29 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
cpThreshold = CHECKPOINT == CheckPointTypeEnum.of(entity.getType()) ?
cpThreshold - 1 : cpThreshold;
if (cpThreshold == 0) {
- LambdaQueryWrapper<SavePoint> queryWrapper = new
LambdaQueryWrapper<SavePoint>()
- .eq(SavePoint::getAppId, entity.getAppId())
- .eq(SavePoint::getType, CHECKPOINT.get());
+ LambdaQueryWrapper<Savepoint> queryWrapper = new
LambdaQueryWrapper<Savepoint>()
+ .eq(Savepoint::getAppId, entity.getAppId())
+ .eq(Savepoint::getType, CHECKPOINT.get());
this.remove(queryWrapper);
return;
}
- LambdaQueryWrapper<SavePoint> queryWrapper = new
LambdaQueryWrapper<SavePoint>()
- .select(SavePoint::getTriggerTime)
- .eq(SavePoint::getAppId, entity.getAppId())
- .eq(SavePoint::getType, CHECKPOINT.get())
- .orderByDesc(SavePoint::getTriggerTime);
+ LambdaQueryWrapper<Savepoint> queryWrapper = new
LambdaQueryWrapper<Savepoint>()
+ .select(Savepoint::getTriggerTime)
+ .eq(Savepoint::getAppId, entity.getAppId())
+ .eq(Savepoint::getType, CHECKPOINT.get())
+ .orderByDesc(Savepoint::getTriggerTime);
- Page<SavePoint> savePointPage = this.baseMapper.selectPage(new
Page<>(1, cpThreshold + 1), queryWrapper);
- if (CollectionUtils.isEmpty(savePointPage.getRecords())
- || savePointPage.getRecords().size() <= cpThreshold) {
+ Page<Savepoint> savepointPage = this.baseMapper.selectPage(new
Page<>(1, cpThreshold + 1), queryWrapper);
+ if (CollectionUtils.isEmpty(savepointPage.getRecords())
+ || savepointPage.getRecords().size() <= cpThreshold) {
return;
}
- SavePoint savePoint = savePointPage.getRecords().get(cpThreshold - 1);
- LambdaQueryWrapper<SavePoint> lambdaQueryWrapper = new
LambdaQueryWrapper<SavePoint>()
- .eq(SavePoint::getAppId, entity.getAppId())
- .eq(SavePoint::getType, CHECKPOINT.get())
- .lt(SavePoint::getTriggerTime, savePoint.getTriggerTime());
+ Savepoint savepoint = savepointPage.getRecords().get(cpThreshold - 1);
+ LambdaQueryWrapper<Savepoint> lambdaQueryWrapper = new
LambdaQueryWrapper<Savepoint>()
+ .eq(Savepoint::getAppId, entity.getAppId())
+ .eq(Savepoint::getType, CHECKPOINT.get())
+ .lt(Savepoint::getTriggerTime, savepoint.getTriggerTime());
this.remove(lambdaQueryWrapper);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
index 9e7f9a941..60b213535 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
@@ -36,7 +36,7 @@ import
org.apache.streampark.console.core.metrics.flink.JobsOverview;
import org.apache.streampark.console.core.metrics.flink.Overview;
import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
import org.apache.streampark.console.core.service.FlinkClusterService;
-import org.apache.streampark.console.core.service.SavePointService;
+import org.apache.streampark.console.core.service.SavepointService;
import org.apache.streampark.console.core.service.alert.AlertService;
import
org.apache.streampark.console.core.service.application.ApplicationActionService;
import
org.apache.streampark.console.core.service.application.ApplicationInfoService;
@@ -96,7 +96,7 @@ public class FlinkAppHttpWatcher {
private FlinkClusterService flinkClusterService;
@Autowired
- private SavePointService savePointService;
+ private SavepointService savepointService;
// track interval every 5 seconds
public static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);
@@ -248,13 +248,13 @@ public class FlinkAppHttpWatcher {
// non-mapping
if (application.getState() !=
FlinkAppStateEnum.MAPPING.getValue()) {
log.error(
- "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi and
getFromYarnRestApi error,job failed,savePoint expired!");
+ "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi and
getFromYarnRestApi error,job failed,savepoint expired!");
if (StopFromEnum.NONE.equals(stopFrom)) {
Date lostTime =
LOST_CACHE.getIfPresent(application.getId());
if (lostTime == null) {
LOST_CACHE.put(application.getId(), new Date());
} else if (DateUtils.toSecondDuration(lostTime, new
Date()) >= 30) {
- savePointService.expire(application.getId());
+ savepointService.expire(application.getId());
application.setState(FlinkAppStateEnum.LOST.getValue());
WATCHING_APPS.remove(application.getId());
LOST_CACHE.invalidate(application.getId());
@@ -329,7 +329,7 @@ public class FlinkAppHttpWatcher {
} catch (Exception e) {
log.error("get flink jobOverview error: {}",
e.getMessage(), e);
}
- // 3) savePoint obsolete check and NEED_START check
+ // 3) savepoint obsolete check and NEED_START check
OptionStateEnum optionState =
OPTIONING.get(application.getId());
if (currentState.equals(FlinkAppStateEnum.RUNNING)) {
handleRunningState(application, optionState, currentState);
@@ -424,7 +424,7 @@ public class FlinkAppHttpWatcher {
}
}
- // The current state is running, and there is a current task in the
savePointCache,
+ // The current state is running, and there is a current task in the
savepointCache,
// indicating that the task is doing savepoint
if (SAVEPOINT_CACHE.getIfPresent(appId) != null) {
application.setOptionState(OptionStateEnum.SAVEPOINTING.getValue());
@@ -491,8 +491,8 @@ public class FlinkAppHttpWatcher {
if (StopFromEnum.NONE.equals(stopFrom) ||
applicationInfoService.checkAlter(application)) {
if (StopFromEnum.NONE.equals(stopFrom)) {
log.info(
- "[StreamPark][FlinkAppHttpWatcher]
getFromFlinkRestApi, job cancel is not form StreamPark,savePoint expired!");
- savePointService.expire(application.getId());
+ "[StreamPark][FlinkAppHttpWatcher]
getFromFlinkRestApi, job cancel is not form StreamPark,savepoint expired!");
+ savepointService.expire(application.getId());
}
stopCanceledJob(application.getId());
doAlert(application, FlinkAppStateEnum.CANCELED);
@@ -551,8 +551,8 @@ public class FlinkAppHttpWatcher {
} finally {
if (StopFromEnum.NONE.equals(stopFrom)) {
log.error(
- "[StreamPark][FlinkAppHttpWatcher] query previous
state was canceling and stopFrom NotFound,savePoint expired!");
- savePointService.expire(application.getId());
+ "[StreamPark][FlinkAppHttpWatcher] query previous
state was canceling and stopFrom NotFound,savepoint expired!");
+ savepointService.expire(application.getId());
if (flinkAppState == FlinkAppStateEnum.KILLED
|| flinkAppState == FlinkAppStateEnum.FAILED) {
doAlert(application, flinkAppState);
@@ -582,8 +582,8 @@ public class FlinkAppHttpWatcher {
if (FlinkAppStateEnum.KILLED.equals(flinkAppState)) {
if (StopFromEnum.NONE.equals(stopFrom)) {
log.error(
- "[StreamPark][FlinkAppHttpWatcher]
getFromYarnRestApi,job was killed and stopFrom NotFound,savePoint expired!");
- savePointService.expire(application.getId());
+ "[StreamPark][FlinkAppHttpWatcher]
getFromYarnRestApi,job was killed and stopFrom NotFound,savepoint expired!");
+ savepointService.expire(application.getId());
}
flinkAppState = FlinkAppStateEnum.CANCELED;
cleanSavepoint(application);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTFilter.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTFilter.java
index 99b8b3cee..d7e81e573 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTFilter.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTFilter.java
@@ -17,8 +17,7 @@
package org.apache.streampark.console.system.authentication;
-import org.apache.streampark.console.base.util.WebUtils;
-import org.apache.streampark.console.core.enums.AuthenticationType;
+import org.apache.streampark.console.base.util.EncryptUtils;
import org.apache.shiro.authz.UnauthorizedException;
import org.apache.shiro.web.filter.authc.BasicHttpAuthenticationFilter;
@@ -58,20 +57,14 @@ public class JWTFilter extends
BasicHttpAuthenticationFilter {
protected boolean executeLogin(ServletRequest request, ServletResponse
response) {
HttpServletRequest httpServletRequest = (HttpServletRequest) request;
String token = httpServletRequest.getHeader(TOKEN);
- AuthenticationType type =
JWTUtil.getAuthType(WebUtils.decryptToken(token));
- if (type == null) {
+ try {
+ token = EncryptUtils.decrypt(token);
+ JWTToken jwtToken = new JWTToken(token);
+ getSubject(request, response).login(jwtToken);
+ return true;
+ } catch (Exception e) {
return false;
}
- if (type == AuthenticationType.OPENAPI) {
- JWTToken jwtToken = new JWTToken(WebUtils.decryptToken(token));
- try {
- getSubject(request, response).login(jwtToken);
- return true;
- } catch (Exception e) {
- return false;
- }
- }
- return true;
}
/** cross-domain support */
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTToken.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTToken.java
index 15d13fe38..dada26528 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTToken.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTToken.java
@@ -33,16 +33,13 @@ public class JWTToken implements AuthenticationToken {
private String expireAt;
- private int signType;
-
public JWTToken(String token) {
this.token = token;
}
- public JWTToken(String token, String expireAt, int signType) {
+ public JWTToken(String token, String expireAt) {
this.token = token;
this.expireAt = expireAt;
- this.signType = signType;
}
@Override
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java
index 93186ccde..f51c276f3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java
@@ -17,9 +17,12 @@
package org.apache.streampark.console.system.authentication;
+import org.apache.streampark.console.base.util.EncryptUtils;
import org.apache.streampark.console.core.enums.AuthenticationType;
+import org.apache.streampark.console.system.entity.User;
import com.auth0.jwt.JWT;
+import com.auth0.jwt.JWTCreator;
import com.auth0.jwt.JWTVerifier;
import com.auth0.jwt.algorithms.Algorithm;
import com.auth0.jwt.interfaces.DecodedJWT;
@@ -33,6 +36,11 @@ public class JWTUtil {
private static Long ttlOfSecond;
+ private static final String JWT_USERID = "userId";
+ private static final String JWT_USERNAME = "userName";
+ private static final String JWT_TYPE = "type";
+ private static final String JWT_TIMESTAMP = "timestamp";
+
/**
* verify token
*
@@ -42,7 +50,7 @@ public class JWTUtil {
public static boolean verify(String token, String username, String secret)
{
try {
Algorithm algorithm = Algorithm.HMAC256(secret);
- JWTVerifier verifier =
JWT.require(algorithm).withClaim("userName", username).build();
+ JWTVerifier verifier =
JWT.require(algorithm).withClaim(JWT_USERNAME, username).build();
verifier.verify(token);
return true;
} catch (Exception ignored) {
@@ -54,7 +62,7 @@ public class JWTUtil {
public static String getUserName(String token) {
try {
DecodedJWT jwt = JWT.decode(token);
- return jwt.getClaim("userName").asString();
+ return jwt.getClaim(JWT_USERNAME).asString();
} catch (Exception ignored) {
return null;
}
@@ -63,16 +71,33 @@ public class JWTUtil {
public static Long getUserId(String token) {
try {
DecodedJWT jwt = JWT.decode(token);
- return jwt.getClaim("userId").asLong();
+ return jwt.getClaim(JWT_USERID).asLong();
} catch (Exception ignored) {
return null;
}
}
+ /**
+ * @param token
+ * @return
+ */
+ public static Long getTimestamp(String token) {
+ try {
+ DecodedJWT jwt = JWT.decode(token);
+ return jwt.getClaim(JWT_TIMESTAMP).asLong();
+ } catch (Exception ignored) {
+ return 0L;
+ }
+ }
+
+ /**
+ * @param token
+ * @return
+ */
public static AuthenticationType getAuthType(String token) {
try {
DecodedJWT jwt = JWT.decode(token);
- int type = jwt.getClaim("type").asInt();
+ int type = jwt.getClaim(JWT_TYPE).asInt();
return AuthenticationType.of(type);
} catch (Exception ignored) {
return null;
@@ -80,44 +105,47 @@ public class JWTUtil {
}
/**
- * generate token
- *
- * @param userId
- * @param userName
+ * @param user
+ * @param authType
* @return
+ * @throws Exception
*/
- public static String sign(
- Long userId, String userName, String secret,
AuthenticationType authType) {
- Long second = getTTLOfSecond() * 1000;
+ public static String sign(User user, AuthenticationType authType) throws
Exception {
+ long second = getTTLOfSecond() * 1000;
Long ttl = System.currentTimeMillis() + second;
- return sign(userId, userName, secret, authType, ttl);
+ return sign(user, authType, ttl);
}
/**
- * generate token
- *
- * @param userId
- * @param userName
+ * @param user
+ * @param authType
* @param expireTime
* @return
+ * @throws Exception
*/
- public static String sign(
- Long userId, String userName, String secret,
AuthenticationType authType,
- Long expireTime) {
+ public static String sign(User user, AuthenticationType authType, Long
expireTime) throws Exception {
Date date = new Date(expireTime);
- Algorithm algorithm = Algorithm.HMAC256(secret);
- return JWT.create()
- .withClaim("userId", userId)
- .withClaim("userName", userName)
- .withClaim("type", authType.get())
- .withExpiresAt(date)
- .sign(algorithm);
+ Algorithm algorithm = Algorithm.HMAC256(user.getPassword());
+
+ JWTCreator.Builder builder =
+ JWT.create()
+ .withClaim(JWT_USERID, user.getUserId())
+ .withClaim(JWT_USERNAME, user.getUsername())
+ .withClaim(JWT_TYPE, authType.get())
+ .withExpiresAt(date);
+
+ if (authType == AuthenticationType.SIGN) {
+ builder.withClaim(JWT_TIMESTAMP, System.currentTimeMillis());
+ }
+
+ String token = builder.sign(algorithm);
+ return EncryptUtils.encrypt(token);
}
public static Long getTTLOfSecond() {
if (ttlOfSecond == null) {
String ttl = System.getProperty("server.session.ttl",
"24h").trim();
- String regexp = "^\\d+(s|m|h|d)$";
+ String regexp = "^\\d+([smhd])$";
Pattern pattern = Pattern.compile(regexp);
if (!pattern.matcher(ttl).matches()) {
throw new IllegalArgumentException(
@@ -125,7 +153,7 @@ public class JWTUtil {
}
String unit = ttl.substring(ttl.length() - 1);
String time = ttl.substring(0, ttl.length() - 1);
- Long second = Long.parseLong(time);
+ long second = Long.parseLong(time);
switch (unit) {
case "m":
return ttlOfSecond = second * 60;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroConfig.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroConfig.java
index 6418281e0..2d82a0e47 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroConfig.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroConfig.java
@@ -74,6 +74,7 @@ public class ShiroConfig {
filterChainDefinitionMap.put("/*.less", "anon");
filterChainDefinitionMap.put("/*.ico", "anon");
filterChainDefinitionMap.put("/", "anon");
+ filterChainDefinitionMap.put("/proxy/**", "anon");
filterChainDefinitionMap.put("/**", "jwt");
shiroFilterFactoryBean.setFilterChainDefinitionMap(filterChainDefinitionMap);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroRealm.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroRealm.java
index b9d6afb0a..6ea9d88a4 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroRealm.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/ShiroRealm.java
@@ -17,14 +17,14 @@
package org.apache.streampark.console.system.authentication;
-import org.apache.streampark.console.base.util.WebUtils;
+import org.apache.streampark.common.util.SystemPropertyUtils;
+import org.apache.streampark.console.base.util.EncryptUtils;
import org.apache.streampark.console.core.enums.AuthenticationType;
import org.apache.streampark.console.system.entity.AccessToken;
import org.apache.streampark.console.system.entity.User;
import org.apache.streampark.console.system.service.AccessTokenService;
import org.apache.streampark.console.system.service.UserService;
-import org.apache.commons.lang3.StringUtils;
import org.apache.shiro.SecurityUtils;
import org.apache.shiro.authc.AuthenticationException;
import org.apache.shiro.authc.AuthenticationInfo;
@@ -83,38 +83,54 @@ public class ShiroRealm extends AuthorizingRealm {
// The token here is passed from the executeLogin method of JWTFilter
and has been decrypted
String credential = (String) authenticationToken.getCredentials();
String username = JWTUtil.getUserName(credential);
+ Long userId = JWTUtil.getUserId(credential);
+ AuthenticationType authType = JWTUtil.getAuthType(credential);
- if (StringUtils.isBlank(username)) {
+ if (username == null || userId == null || authType == null) {
throw new AuthenticationException("the authorization token is
invalid");
}
+
+ switch (authType) {
+ case SIGN:
+ Long timestamp = JWTUtil.getTimestamp(credential);
+ Long startTime =
SystemPropertyUtils.getLong("streampark.start.timestamp", 0);
+ if (timestamp < startTime) {
+ throw new AuthenticationException("the authorization token
is expired");
+ }
+ break;
+ case OPENAPI:
+ // Check whether the token belongs to the api and whether the
permission is valid
+ AccessToken accessToken =
accessTokenService.getByUserId(userId);
+ try {
+ String encryptToken = EncryptUtils.encrypt(credential);
+ if (accessToken == null ||
!accessToken.getToken().equals(encryptToken)) {
+ throw new AuthenticationException("the openapi
authorization token is invalid");
+ }
+ } catch (Exception e) {
+ throw new AuthenticationException(e);
+ }
+
+ if
(AccessToken.STATUS_DISABLE.equals(accessToken.getStatus())) {
+ throw new AuthenticationException(
+ "the openapi authorization token is disabled, please
contact the administrator");
+ }
+
+ if (User.STATUS_LOCK.equals(accessToken.getUserStatus())) {
+ throw new AuthenticationException(
+ "the user [" + username + "] has been locked, please
contact the administrator");
+ }
+
SecurityUtils.getSubject().getSession().setAttribute(AccessToken.IS_API_TOKEN,
true);
+ break;
+ default:
+ break;
+ }
+
// Query user information by username
User user = userService.getByUsername(username);
-
- if (user == null || !JWTUtil.verify(credential, username,
user.getSalt())) {
+ if (user == null || !JWTUtil.verify(credential, username,
user.getPassword())) {
throw new AuthenticationException("the authorization token
verification failed.");
}
- AuthenticationType authType = JWTUtil.getAuthType(credential);
- if (authType == AuthenticationType.OPENAPI) {
- // Check whether the token belongs to the api and whether the
permission is valid
- AccessToken accessToken =
accessTokenService.getByUserId(user.getUserId());
- if (accessToken == null
- ||
!accessToken.getToken().equals(WebUtils.encryptToken(credential))) {
- throw new AuthenticationException("the openapi authorization
token is invalid");
- }
-
- if (AccessToken.STATUS_DISABLE.equals(accessToken.getStatus())) {
- throw new AuthenticationException(
- "the openapi authorization token is disabled, please
contact the administrator");
- }
-
- if (User.STATUS_LOCK.equals(accessToken.getUserStatus())) {
- throw new AuthenticationException(
- "the user [" + username + "] has been locked, please
contact the administrator");
- }
-
-
SecurityUtils.getSubject().getSession().setAttribute(AccessToken.IS_API_TOKEN,
true);
- }
return new SimpleAuthenticationInfo(credential, credential,
"streampark_shiro_realm");
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/AccessTokenController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/AccessTokenController.java
index 0fdb8f54c..604bbcc5b 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/AccessTokenController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/AccessTokenController.java
@@ -19,7 +19,6 @@ package org.apache.streampark.console.system.controller;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
-import org.apache.streampark.console.base.exception.InternalException;
import org.apache.streampark.console.core.enums.AccessTokenStateEnum;
import org.apache.streampark.console.core.util.ServiceHelper;
import org.apache.streampark.console.system.entity.AccessToken;
@@ -49,7 +48,7 @@ public class AccessTokenController {
@RequiresPermissions("token:add")
public RestResponse createToken(
@NotNull(message = "{required}") Long
userId,
- @RequestParam(required = false) String
description) throws InternalException {
+ @RequestParam(required = false) String
description) throws Exception {
return accessTokenService.create(userId, description);
}
@@ -78,7 +77,7 @@ public class AccessTokenController {
@PostMapping("toggle")
@RequiresPermissions("token:add")
public RestResponse toggleToken(@NotNull(message = "{required}") Long
tokenId) {
- return accessTokenService.toggleToken(tokenId);
+ return accessTokenService.toggle(tokenId);
}
@DeleteMapping(value = "delete")
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/PassportController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/PassportController.java
index 01256fef4..732d9edac 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/PassportController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/PassportController.java
@@ -19,7 +19,6 @@ package org.apache.streampark.console.system.controller;
import org.apache.streampark.common.util.DateUtils;
import org.apache.streampark.console.base.domain.RestResponse;
-import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.enums.AuthenticationType;
import org.apache.streampark.console.core.enums.LoginTypeEnum;
import org.apache.streampark.console.system.authentication.JWTToken;
@@ -40,8 +39,6 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
-import javax.validation.constraints.NotBlank;
-
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
@@ -79,16 +76,14 @@ public class PassportController {
}
@PostMapping("signin")
- public RestResponse signin(
- @NotBlank(message = "{required}") String
username,
- @NotBlank(message = "{required}") String
password,
- @NotBlank(message = "{required}") String
loginType) throws Exception {
+ public RestResponse signin(User loginUser) throws Exception {
- if (StringUtils.isEmpty(username)) {
+ if (StringUtils.isEmpty(loginUser.getUsername())) {
return RestResponse.success().put("code", 0);
}
- User user = authenticator.authenticate(username, password, loginType);
+ User user =
+ authenticator.authenticate(loginUser.getUsername(),
loginUser.getPassword(), loginUser.getLoginType());
if (user == null) {
return RestResponse.success().put("code", 0);
@@ -98,23 +93,17 @@ public class PassportController {
return RestResponse.success().put("code", 1);
}
- this.userService.updateLoginTime(username);
- String sign = JWTUtil.sign(user.getUserId(), username, user.getSalt(),
AuthenticationType.SIGN);
+ this.userService.updateLoginTime(loginUser.getUsername());
+ String token = JWTUtil.sign(user, AuthenticationType.SIGN);
LocalDateTime expireTime =
LocalDateTime.now().plusSeconds(JWTUtil.getTTLOfSecond());
String ttl = DateUtils.formatFullTime(expireTime);
- // shiro login
- JWTToken loginToken = new JWTToken(sign, ttl,
AuthenticationType.SIGN.get());
- SecurityUtils.getSubject().login(loginToken);
-
// generate UserInfo
- String token = WebUtils.encryptToken(sign);
- JWTToken jwtToken = new JWTToken(token, ttl,
AuthenticationType.SIGN.get());
String userId = RandomStringUtils.randomAlphanumeric(20);
user.setId(userId);
- Map<String, Object> userInfo =
- userService.generateFrontendUserInfo(user, user.getLastTeamId(),
jwtToken);
+ JWTToken jwtToken = new JWTToken(token, ttl);
+ Map<String, Object> userInfo =
userService.generateFrontendUserInfo(user, jwtToken);
return new RestResponse().data(userInfo);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/SsoController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/SsoController.java
index 808e5568d..4d9b501a7 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/SsoController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/SsoController.java
@@ -89,7 +89,7 @@ public class SsoController {
ApiAlertException.throwIfNull(
principal.getName(), "Please configure the correct Principal Name
Attribute");
- User user = authenticator.authenticate(principal.getName(), null,
LoginTypeEnum.SSO.toString());
+ User user = authenticator.authenticate(principal.getName(), null,
LoginTypeEnum.SSO);
return userService.getLoginUserInfo(user);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/UserController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/UserController.java
index d91d6abdd..262534daf 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/UserController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/UserController.java
@@ -136,8 +136,9 @@ public class UserController {
// 2) get latest userInfo
user.dataMasking();
+ user.setLastTeamId(teamId);
- Map<String, Object> infoMap =
userService.generateFrontendUserInfo(user, teamId, null);
+ Map<String, Object> infoMap =
userService.generateFrontendUserInfo(user, null);
return new RestResponse().data(infoMap);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/AccessToken.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/AccessToken.java
index b17f24223..d17871111 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/AccessToken.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/AccessToken.java
@@ -35,7 +35,6 @@ import java.util.Date;
@TableName("t_access_token")
public class AccessToken extends BaseEntity {
- public static final String DEFAULT_EXPIRE_TIME = "9999-01-01 00:00:00";
public static final String IS_API_TOKEN = "is_api_token";
public static final Integer STATUS_ENABLE = 1;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
index 5ab8850f8..6eeabc815 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
@@ -54,6 +54,7 @@ public class StartedUpRunner implements ApplicationRunner {
System.out.println(" Info : streampark-console start
successful ");
System.out.println(" Local : http://localhost:" + port);
System.out.println(" Time : " + LocalDateTime.now() +
"\n\n");
+ System.setProperty("streampark.start.timestamp",
System.currentTimeMillis() + "");
}
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/Authenticator.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/Authenticator.java
index f2414c2ae..78bd5deab 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/Authenticator.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/Authenticator.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.system.security;
+import org.apache.streampark.console.core.enums.LoginTypeEnum;
import org.apache.streampark.console.system.entity.User;
public interface Authenticator {
@@ -28,5 +29,5 @@ public interface Authenticator {
* @param password user password
* @return result object
*/
- User authenticate(String username, String password, String loginType)
throws Exception;
+ User authenticate(String username, String password, LoginTypeEnum
loginType) throws Exception;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/AuthenticatorImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/AuthenticatorImpl.java
index 5d783d835..b315014ad 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/AuthenticatorImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/AuthenticatorImpl.java
@@ -39,13 +39,11 @@ public class AuthenticatorImpl implements Authenticator {
private LdapService ldapService;
@Override
- public User authenticate(String username, String password, String
loginType) throws Exception {
- LoginTypeEnum loginTypeEnum = LoginTypeEnum.of(loginType);
-
+ public User authenticate(String username, String password, LoginTypeEnum
loginType) throws Exception {
ApiAlertException.throwIfNull(
- loginTypeEnum, String.format("the login type [%s] is not
supported.", loginType));
+ loginType, "the login type is null");
- switch (loginTypeEnum) {
+ switch (loginType) {
case PASSWORD:
return passwordAuthenticate(username, password);
case LDAP:
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/AccessTokenService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/AccessTokenService.java
index 60c7bae22..61e9aa566 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/AccessTokenService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/AccessTokenService.java
@@ -36,7 +36,7 @@ public interface AccessTokenService extends
IService<AccessToken> {
* @return RestResponse
* @throws InternalException
*/
- RestResponse create(Long userId, String description) throws
InternalException;
+ RestResponse create(Long userId, String description) throws Exception;
/**
* Retrieves a page of {@link AccessToken} objects based on the provided
parameters.
@@ -53,7 +53,7 @@ public interface AccessTokenService extends
IService<AccessToken> {
* @param tokenId AccessToken id
* @return RestResponse
*/
- RestResponse toggleToken(Long tokenId);
+ RestResponse toggle(Long tokenId);
/**
* Get the corresponding AccessToken based on the user ID
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/UserService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/UserService.java
index cbf40603b..9c907f7cd 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/UserService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/UserService.java
@@ -138,11 +138,10 @@ public interface UserService extends IService<User> {
* Generate user information for the front end
*
* @param user User
- * @param teamId team id
* @param token JWTToken
* @return
*/
- Map<String, Object> generateFrontendUserInfo(User user, Long teamId,
JWTToken token);
+ Map<String, Object> generateFrontendUserInfo(User user, JWTToken token);
/**
* transfer user resources to specified users
@@ -158,7 +157,7 @@ public interface UserService extends IService<User> {
* @param user User
* @return RestResponse
*/
- RestResponse getLoginUserInfo(User user);
+ RestResponse getLoginUserInfo(User user) throws Exception;
void deleteUser(Long userId);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/AccessTokenServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/AccessTokenServiceImpl.java
index 8afebcf70..b18a069fc 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/AccessTokenServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/AccessTokenServiceImpl.java
@@ -21,9 +21,7 @@ import org.apache.streampark.console.base.domain.ResponseCode;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
-import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.enums.AuthenticationType;
-import org.apache.streampark.console.system.authentication.JWTToken;
import org.apache.streampark.console.system.authentication.JWTUtil;
import org.apache.streampark.console.system.entity.AccessToken;
import org.apache.streampark.console.system.entity.User;
@@ -53,24 +51,21 @@ public class AccessTokenServiceImpl extends
ServiceImpl<AccessTokenMapper, Acces
private UserService userService;
@Override
- public RestResponse create(Long userId, String description) {
+ public RestResponse create(Long userId, String description) throws
Exception {
User user = userService.getById(userId);
if (user == null) {
return RestResponse.success().put("code", 0).message("user not
available");
}
+
AccessToken existAccessToken =
baseMapper.selectByUserId(user.getUserId());
if (existAccessToken != null) {
return RestResponse.success().put("code", 0)
.message(String.format("user %s already has a token",
user.getUsername()));
}
- String token = WebUtils.encryptToken(
- JWTUtil.sign(
- user.getUserId(), user.getUsername(), user.getSalt(),
- AuthenticationType.OPENAPI));
- JWTToken jwtToken = new JWTToken(token,
AccessToken.DEFAULT_EXPIRE_TIME, AuthenticationType.SIGN.get());
+ String token = JWTUtil.sign(user, AuthenticationType.OPENAPI,
Long.MAX_VALUE);
AccessToken accessToken = new AccessToken();
- accessToken.setToken(jwtToken.getToken());
+ accessToken.setToken(token);
accessToken.setUserId(user.getUserId());
accessToken.setDescription(description);
@@ -90,7 +85,7 @@ public class AccessTokenServiceImpl extends
ServiceImpl<AccessTokenMapper, Acces
}
@Override
- public RestResponse toggleToken(Long tokenId) {
+ public RestResponse toggle(Long tokenId) {
AccessToken tokenInfo = baseMapper.selectById(tokenId);
if (tokenInfo == null) {
return RestResponse.fail(ResponseCode.CODE_FAIL_ALERT,
"accessToken could not be found!");
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
index 08381e2f0..3e03e23d4 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
@@ -24,7 +24,6 @@ import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
import org.apache.streampark.console.base.util.ShaHashUtils;
-import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.enums.AuthenticationType;
import org.apache.streampark.console.core.enums.LoginTypeEnum;
import org.apache.streampark.console.core.service.ResourceService;
@@ -228,7 +227,7 @@ public class UserServiceImpl extends
ServiceImpl<UserMapper, User> implements Us
}
@Override
- public RestResponse getLoginUserInfo(User user) {
+ public RestResponse getLoginUserInfo(User user) throws Exception {
if (user == null) {
return RestResponse.success().put(RestResponse.CODE_KEY, 0);
}
@@ -237,16 +236,18 @@ public class UserServiceImpl extends
ServiceImpl<UserMapper, User> implements Us
return RestResponse.success().put(RestResponse.CODE_KEY, 1);
}
- updateLoginTime(user.getUsername());
- String token = WebUtils.encryptToken(
- JWTUtil.sign(
- user.getUserId(), user.getUsername(), user.getSalt(),
AuthenticationType.SIGN));
+ this.updateLoginTime(user.getUsername());
+ String token = JWTUtil.sign(user, AuthenticationType.SIGN);
+
LocalDateTime expireTime =
LocalDateTime.now().plusSeconds(JWTUtil.getTTLOfSecond());
- String expireTimeStr = DateUtils.formatFullTime(expireTime);
- JWTToken jwtToken = new JWTToken(token, expireTimeStr,
AuthenticationType.SIGN.get());
+ String ttl = DateUtils.formatFullTime(expireTime);
+
+ // generate UserInfo
String userId = RandomStringUtils.randomAlphanumeric(20);
user.setId(userId);
- Map<String, Object> userInfo = generateFrontendUserInfo(user,
user.getLastTeamId(), jwtToken);
+ JWTToken jwtToken = new JWTToken(token, ttl);
+ Map<String, Object> userInfo = generateFrontendUserInfo(user,
jwtToken);
+
return RestResponse.success(userInfo);
}
@@ -265,7 +266,7 @@ public class UserServiceImpl extends
ServiceImpl<UserMapper, User> implements Us
* @return UserInfo
*/
@Override
- public Map<String, Object> generateFrontendUserInfo(User user, Long
teamId, JWTToken token) {
+ public Map<String, Object> generateFrontendUserInfo(User user, JWTToken
token) {
Map<String, Object> userInfo = new HashMap<>(8);
// 1) token & expire
@@ -279,7 +280,7 @@ public class UserServiceImpl extends
ServiceImpl<UserMapper, User> implements Us
userInfo.put("user", user);
// 3) permissions
- Set<String> permissions = this.listPermissions(user.getUserId(),
teamId);
+ Set<String> permissions = this.listPermissions(user.getUserId(),
user.getLastTeamId());
userInfo.put("permissions", permissions);
return userInfo;
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavePointMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavepointMapper.xml
similarity index 97%
rename from
streampark-console/streampark-console-service/src/main/resources/mapper/core/SavePointMapper.xml
rename to
streampark-console/streampark-console-service/src/main/resources/mapper/core/SavepointMapper.xml
index cc1b72e57..99ff6b427 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavePointMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SavepointMapper.xml
@@ -16,8 +16,8 @@
~ limitations under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper namespace="org.apache.streampark.console.core.mapper.SavePointMapper">
- <resultMap id="BaseResultMap"
type="org.apache.streampark.console.core.entity.SavePoint">
+<mapper namespace="org.apache.streampark.console.core.mapper.SavepointMapper">
+ <resultMap id="BaseResultMap"
type="org.apache.streampark.console.core.entity.Savepoint">
<id column="id" jdbcType="BIGINT" property="id"/>
<result column="app_id" jdbcType="VARCHAR" property="appId"/>
<result column="latest" jdbcType="BOOLEAN" property="latest"/>
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java
index b1cfb89e8..c5de0d6a2 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java
@@ -20,7 +20,7 @@ package org.apache.streampark.console.core.service;
import org.apache.streampark.console.SpringUnitTestBase;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
-import org.apache.streampark.console.base.util.WebUtils;
+import org.apache.streampark.console.base.util.EncryptUtils;
import org.apache.streampark.console.system.authentication.JWTToken;
import org.apache.streampark.console.system.authentication.JWTUtil;
import org.apache.streampark.console.system.entity.AccessToken;
@@ -44,22 +44,21 @@ public class AccessTokenServiceTest extends
SpringUnitTestBase {
@Test
void testCrudToken() throws Exception {
Long mockUserId = 100000L;
- String expireTime = "9999-01-01 00:00:00";
RestResponse restResponse = accessTokenService.create(mockUserId, "");
Assertions.assertNotNull(restResponse);
- Assertions.assertInstanceOf(AccessToken.class,
restResponse.get(RestResponse.DATA_KEY));
+ Assertions.assertInstanceOf(AccessToken.class,
restResponse.get("data"));
// verify
- AccessToken accessToken = (AccessToken)
restResponse.get(RestResponse.DATA_KEY);
+ AccessToken accessToken = (AccessToken) restResponse.get("data");
LOG.info(accessToken.getToken());
- JWTToken jwtToken = new
JWTToken(WebUtils.decryptToken(accessToken.getToken()));
+ JWTToken jwtToken = new
JWTToken(EncryptUtils.decrypt(accessToken.getToken()));
LOG.info(jwtToken.getToken());
String username = JWTUtil.getUserName(jwtToken.getToken());
Assertions.assertNotNull(username);
Assertions.assertEquals("admin", username);
User user = userService.getByUsername(username);
Assertions.assertNotNull(user);
- Assertions.assertTrue(JWTUtil.verify(jwtToken.getToken(), username,
user.getSalt()));
+ Assertions.assertTrue(JWTUtil.verify(jwtToken.getToken(), username,
user.getPassword()));
// list
AccessToken mockToken1 = new AccessToken();
@@ -73,9 +72,9 @@ public class AccessTokenServiceTest extends
SpringUnitTestBase {
// toggle
Long tokenId = accessToken.getId();
- RestResponse toggleTokenResp = accessTokenService.toggleToken(tokenId);
+ RestResponse toggleTokenResp = accessTokenService.toggle(tokenId);
Assertions.assertNotNull(toggleTokenResp);
- Assertions.assertTrue((Boolean)
toggleTokenResp.get(RestResponse.DATA_KEY));
+ Assertions.assertTrue((Boolean) toggleTokenResp.get("data"));
// get
AccessToken afterToggle = accessTokenService.getByUserId(mockUserId);
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceTest.java
index 693248dee..f089d81b1 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceTest.java
@@ -82,7 +82,7 @@ class ApplicationManageServiceTest extends SpringUnitTestBase
{
app.setK8sHadoopIntegration(false);
app.setBackUp(false);
app.setRestart(false);
- app.setSavePointed(false);
+ app.setRestoreOrTriggerSavepoint(false);
app.setDrain(false);
app.setAllowNonRestored(false);
@@ -95,7 +95,7 @@ class ApplicationManageServiceTest extends SpringUnitTestBase
{
Application application = new Application();
application.setId(1304056220683497473L);
application.setRestart(false);
- application.setSavePointed(false);
+ application.setRestoreOrTriggerSavepoint(false);
application.setAllowNonRestored(false);
applicationActionService.start(application, false);
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavepointServiceTest.java
similarity index 83%
rename from
streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
rename to
streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavepointServiceTest.java
index 5801e08fc..2a3bc205f 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavepointServiceTest.java
@@ -29,7 +29,7 @@ import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.enums.ConfigFileTypeEnum;
import org.apache.streampark.console.core.enums.EffectiveTypeEnum;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
-import org.apache.streampark.console.core.service.impl.SavePointServiceImpl;
+import org.apache.streampark.console.core.service.impl.SavepointServiceImpl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -44,13 +44,13 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
/**
* Test class for the implementation {@link
- * org.apache.streampark.console.core.service.impl.SavePointServiceImpl} of
{@link
- * SavePointService}.
+ * SavepointServiceImpl} of {@link
+ * SavepointService}.
*/
-class SavePointServiceTest extends SpringUnitTestBase {
+class SavepointServiceTest extends SpringUnitTestBase {
@Autowired
- private SavePointService savePointService;
+ private SavepointService savepointService;
@Autowired
private ApplicationConfigService configService;
@@ -67,7 +67,7 @@ class SavePointServiceTest extends SpringUnitTestBase {
@AfterEach
void cleanTestRecordsInDatabase() {
- savePointService.remove(new QueryWrapper<>());
+ savepointService.remove(new QueryWrapper<>());
configService.remove(new QueryWrapper<>());
effectiveService.remove(new QueryWrapper<>());
flinkEnvService.remove(new QueryWrapper<>());
@@ -83,17 +83,17 @@ class SavePointServiceTest extends SpringUnitTestBase {
void testGetSavepointFromDynamicProps() {
String propsWithEmptyTargetValue = "-Dstate.savepoints.dir=";
String props = "-Dstate.savepoints.dir=hdfs:///test";
- SavePointServiceImpl savePointServiceImpl = (SavePointServiceImpl)
savePointService;
+ SavepointServiceImpl savepointServiceImpl = (SavepointServiceImpl)
savepointService;
-
assertThat(savePointServiceImpl.getSavepointFromDynamicProps(null)).isNull();
-
assertThat(savePointServiceImpl.getSavepointFromDynamicProps(props)).isEqualTo("hdfs:///test");
-
assertThat(savePointServiceImpl.getSavepointFromDynamicProps(propsWithEmptyTargetValue))
+
assertThat(savepointServiceImpl.getSavepointFromDynamicProps(null)).isNull();
+
assertThat(savepointServiceImpl.getSavepointFromDynamicProps(props)).isEqualTo("hdfs:///test");
+
assertThat(savepointServiceImpl.getSavepointFromDynamicProps(propsWithEmptyTargetValue))
.isEmpty();
}
@Test
void testGetSavepointFromAppCfgIfStreamParkOrSQLJob() {
- SavePointServiceImpl savePointServiceImpl = (SavePointServiceImpl)
savePointService;
+ SavepointServiceImpl savepointServiceImpl = (SavepointServiceImpl)
savepointService;
Application app = new Application();
Long appId = 1L;
Long appCfgId = 1L;
@@ -101,17 +101,17 @@ class SavePointServiceTest extends SpringUnitTestBase {
// Test for non-(StreamPark job Or FlinkSQL job)
app.setAppType(ApplicationType.APACHE_FLINK.getType());
- assertThat(savePointServiceImpl.getSavepointFromConfig(app)).isNull();
+ assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull();
app.setAppType(ApplicationType.STREAMPARK_FLINK.getType());
app.setJobType(FlinkDevelopmentMode.CUSTOM_CODE.getMode());
- assertThat(savePointServiceImpl.getSavepointFromConfig(app)).isNull();
+ assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull();
// Test for (StreamPark job Or FlinkSQL job) without application
config.
app.setAppType(ApplicationType.STREAMPARK_FLINK.getType());
- assertThat(savePointServiceImpl.getSavepointFromConfig(app)).isNull();
+ assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull();
app.setAppType(ApplicationType.STREAMPARK_FLINK.getType());
app.setJobType(FlinkDevelopmentMode.CUSTOM_CODE.getMode());
- assertThat(savePointServiceImpl.getSavepointFromConfig(app)).isNull();
+ assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull();
// Test for (StreamPark job Or FlinkSQL job) with application config
just disabled checkpoint.
ApplicationConfig appCfg = new ApplicationConfig();
@@ -120,7 +120,7 @@ class SavePointServiceTest extends SpringUnitTestBase {
appCfg.setContent("state.savepoints.dir=hdfs:///test");
appCfg.setFormat(ConfigFileTypeEnum.PROPERTIES.getValue());
configService.save(appCfg);
- assertThat(savePointServiceImpl.getSavepointFromConfig(app)).isNull();
+ assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull();
// Test for (StreamPark job or FlinkSQL job) with application config
and enabled checkpoint and
// configured value.
@@ -128,7 +128,7 @@ class SavePointServiceTest extends SpringUnitTestBase {
// Test for non-value for CHECKPOINTING_INTERVAL
appCfg.setContent("");
configService.updateById(appCfg);
- assertThat(savePointServiceImpl.getSavepointFromConfig(app)).isNull();
+ assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull();
// Test for configured CHECKPOINTING_INTERVAL
appCfg.setContent(
@@ -142,12 +142,12 @@ class SavePointServiceTest extends SpringUnitTestBase {
effective.setAppId(appId);
effective.setTargetType(EffectiveTypeEnum.CONFIG.getType());
effectiveService.save(effective);
-
assertThat(savePointServiceImpl.getSavepointFromConfig(app)).isEqualTo("hdfs:///test");
+
assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isEqualTo("hdfs:///test");
}
@Test
void testGetSavepointFromDeployLayer() throws JsonProcessingException {
- SavePointServiceImpl savePointServiceImpl = (SavePointServiceImpl)
savePointService;
+ SavepointServiceImpl savepointServiceImpl = (SavepointServiceImpl)
savepointService;
Long appId = 1L;
Long idOfFlinkEnv = 1L;
Long teamId = 1L;
@@ -168,7 +168,7 @@ class SavePointServiceTest extends SpringUnitTestBase {
flinkEnvService.save(flinkEnv);
// Test for non-remote mode
-
assertThat(savePointServiceImpl.getSavepointFromDeployLayer(application))
+
assertThat(savepointServiceImpl.getSavepointFromDeployLayer(application))
.isEqualTo("hdfs:///test");
// Start the test lines for remote mode
@@ -177,7 +177,7 @@ class SavePointServiceTest extends SpringUnitTestBase {
// Test for it without cluster.
application.setExecutionMode(FlinkExecutionMode.REMOTE.getMode());
application.setFlinkClusterId(clusterId);
- assertThatThrownBy(() ->
savePointServiceImpl.getSavepointFromDeployLayer(application))
+ assertThatThrownBy(() ->
savepointServiceImpl.getSavepointFromDeployLayer(application))
.isInstanceOf(NullPointerException.class);
// Ignored.
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java
index 2167fc14f..ff3ea0bb1 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java
@@ -19,8 +19,9 @@ package org.apache.streampark.console.system.authentication;
import org.apache.streampark.common.util.DateUtils;
import org.apache.streampark.console.SpringUnitTestBase;
+import org.apache.streampark.console.base.util.EncryptUtils;
import org.apache.streampark.console.core.enums.AuthenticationType;
-import org.apache.streampark.console.system.entity.AccessToken;
+import org.apache.streampark.console.system.entity.User;
import com.auth0.jwt.JWT;
import org.junit.jupiter.api.Assertions;
@@ -32,19 +33,23 @@ import java.util.TimeZone;
class JWTTest extends SpringUnitTestBase {
@Test
- void testExpireTime() {
+ void testExpireTime() throws Exception {
String userName = "black";
- String expireTime = AccessToken.DEFAULT_EXPIRE_TIME;
- String token = JWTUtil.sign(
- 10000L,
- userName,
- "streampark",
- AuthenticationType.SIGN,
- DateUtils.getTime(expireTime, DateUtils.fullFormat(),
TimeZone.getDefault()));
+ String ttl = "2022-09-01 00:00:00";
+ User user = new User();
+ user.setUserId(10000L);
+ user.setUsername(userName);
+ user.setPassword("streampark");
+ String token =
+ JWTUtil.sign(
+ user,
+ AuthenticationType.SIGN,
+ DateUtils.getTime(ttl, DateUtils.fullFormat(),
TimeZone.getDefault()));
assert token != null;
- Date expiresAt = JWT.decode(token).getExpiresAt();
- String decodeExpireTime = DateUtils.format(expiresAt,
DateUtils.fullFormat(), TimeZone.getDefault());
- Assertions.assertEquals(expireTime, decodeExpireTime);
+ Date expiresAt =
JWT.decode(EncryptUtils.decrypt(token)).getExpiresAt();
+ String decodeExpireTime =
+ DateUtils.format(expiresAt, DateUtils.fullFormat(),
TimeZone.getDefault());
+ Assertions.assertEquals(ttl, decodeExpireTime);
}
}
diff --git a/streampark-console/streampark-console-webapp/src/api/flink/app.ts
b/streampark-console/streampark-console-webapp/src/api/flink/app.ts
index da8f07f7f..1d034b3a3 100644
--- a/streampark-console/streampark-console-webapp/src/api/flink/app.ts
+++ b/streampark-console/streampark-console-webapp/src/api/flink/app.ts
@@ -205,7 +205,7 @@ export function fetchK8sStartLog(data):
Promise<AxiosResponse<any>> {
*/
export function fetchCheckSavepointPath(data: {
id?: string;
- savePoint?: string;
+ savepointPath?: string;
}): Promise<AxiosResponse<Result>> {
return defHttp.post(
{ url: APP_API.CHECK_SAVEPOINT_PATH, data },
diff --git
a/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts
b/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts
index 7fdf36c29..cd89292d4 100644
--- a/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts
+++ b/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts
@@ -116,8 +116,8 @@ export interface AppListRecord {
flinkVersion: string;
confPath?: any;
format?: any;
- savePoint?: any;
- savePointed: boolean;
+ savepointPath?: any;
+ restoreOrTriggerSavepoint: boolean;
drain: boolean;
nativeFormat: boolean;
allowNonRestored: boolean;
@@ -149,10 +149,10 @@ interface AppControl {
/* cancel params */
export interface CancelParam {
id: string;
- savePointed: boolean;
+ restoreOrTriggerSavepoint: boolean;
drain: boolean;
nativeFormat: boolean;
- savePoint: string;
+ savepointPath: string;
}
// create Params
export interface CreateParams {
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
index 445f1e082..6e0c82bb8 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
@@ -84,6 +84,7 @@ export default {
created: 'CREATED',
starting: 'STARTING',
restarting: 'RESTARTING',
+ savepoint: 'SAVEPOINTING',
running: 'RUNNING',
failing: 'FAILING',
failed: 'FAILED',
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
index 7324c5f1f..55ace86b8 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
@@ -82,6 +82,7 @@ export default {
created: '创建中',
starting: '启动中',
restarting: '重启中',
+ savepoint: '快照中',
running: '运行中',
failing: '失败中',
failed: '作业失败',
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
index fc64533ef..8020ab3e4 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
@@ -50,7 +50,7 @@
Object.assign(receiveData, data);
resetFields();
setFieldsValue({
- startSavePoint: receiveData.selected?.path,
+ savepointPath: receiveData.selected?.path,
});
}
});
@@ -67,7 +67,7 @@
labelWidth: 120,
schemas: [
{
- field: 'startSavePointed',
+ field: 'restoreSavepoint',
label: t('flink.app.view.fromSavepoint'),
component: 'Switch',
componentProps: {
@@ -78,7 +78,7 @@
afterItem: () => h('span', { class: 'conf-switch' },
t('flink.app.view.savepointTip')),
},
{
- field: 'startSavePoint',
+ field: 'savepointPath',
label: 'Savepoint',
component:
receiveData.historySavePoint && receiveData.historySavePoint.length
> 0
@@ -87,7 +87,7 @@
afterItem: () =>
h('span', { class: 'conf-switch' },
handleSavePointTip(receiveData.historySavePoint)),
slot: 'savepoint',
- ifShow: ({ values }) => values.startSavePointed,
+ ifShow: ({ values }) => values.restoreSavepoint,
required: true,
},
{
@@ -100,7 +100,7 @@
},
afterItem: () => h('span', { class: 'conf-switch' },
t('flink.app.view.ignoreRestoredTip')),
defaultValue: false,
- ifShow: ({ values }) => values.startSavePointed,
+ ifShow: ({ values }) => values.restoreSavepoint,
},
],
colon: true,
@@ -132,13 +132,13 @@
async function handleDoSubmit() {
try {
const formValue = (await validate()) as Recordable;
- const savePointed = formValue.startSavePointed;
- const savePointPath = savePointed ? formValue['startSavePoint'] : null;
+ const restoreOrTriggerSavepoint = formValue.restoreSavepoint;
+ const savepointPath = restoreOrTriggerSavepoint ?
formValue['savepointPath'] : null;
handleReset();
const { data } = await fetchStart({
id: receiveData.application.id,
- savePointed,
- savePoint: savePointPath,
+ restoreOrTriggerSavepoint,
+ savepointPath: savepointPath,
allowNonRestored: formValue.allowNonRestoredState || false,
});
if (data.data) {
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
index 58e68959b..f3d5fc90e 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StopApplicationModal.vue
@@ -46,7 +46,7 @@
labelWidth: 120,
schemas: [
{
- field: 'stopSavePointed',
+ field: 'triggerSavepoint',
label: t('flink.app.operation.triggerSavePoint'),
component: 'Switch',
componentProps: {
@@ -65,7 +65,7 @@
placeholder: t('flink.app.operation.customSavepoint'),
allowClear: true,
},
- ifShow: ({ values }) => !!values.stopSavePointed,
+ ifShow: ({ values }) => !!values.triggerSavepoint,
},
{
field: 'drain',
@@ -76,7 +76,7 @@
unCheckedChildren: 'OFF',
},
defaultValue: false,
- ifShow: ({ values }) => !!values.stopSavePointed,
+ ifShow: ({ values }) => !!values.triggerSavepoint,
afterItem: () => h('span', { class: 'conf-switch' },
t('flink.app.operation.enableDrain')),
},
],
@@ -90,18 +90,18 @@
/* submit */
async function handleSubmit() {
try {
- const { stopSavePointed, customSavepoint, drain } = (await validate())
as Recordable;
+ const { triggerSavepoint, customSavepoint, drain } = (await validate())
as Recordable;
const stopReq = {
id: app.id,
- savePointed: stopSavePointed,
- savePoint: customSavepoint,
+ restoreOrTriggerSavepoint: triggerSavepoint,
+ savepointPath: customSavepoint,
drain: drain,
};
- if (stopSavePointed) {
+ if (triggerSavepoint) {
if (customSavepoint) {
const { data } = await fetchCheckSavepointPath({
- savePoint: customSavepoint,
+ savepointPath: customSavepoint,
});
if (data.data === false) {
await createErrorSwal(t('flink.app.operation.invalidSavePoint') +
data.message);
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
index 7e0f11897..1ca2a62f8 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
@@ -103,7 +103,7 @@ export const useSavepoint = (updateOption: Fn) => {
if (unref(customSavepoint)) {
submitLoading.value = true;
const { data } = await fetchCheckSavepointPath({
- savePoint: unref(customSavepoint),
+ savepointPath: unref(customSavepoint),
});
if (data.data === false) {
createErrorSwal('custom savepoint path is invalid, ' +
data.message);
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationsPage.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationsPage.java
index 57f105795..85383465d 100644
---
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationsPage.java
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationsPage.java
@@ -180,7 +180,7 @@ public class ApplicationsPage extends NavBarPage implements
ApacheFlinkPage.Tab
PageFactory.initElements(driver, this);
}
- @FindBy(xpath =
"//button[@id='startApplicationModal_startSavePointed']//span[contains(text(),
'ON')]")
+ @FindBy(xpath =
"//button[@id='startApplicationModal_restoreSavepoint']//span[contains(text(),
'ON')]")
private WebElement radioFromSavepoint;
@FindBy(xpath = "//div[contains(.,'Start
Job')]//button[contains(@class, 'ant-btn')]//span[contains(., 'Apply')]")
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelResponse.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelResponse.scala
index 20f11bd46..668a3af89 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelResponse.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelResponse.scala
@@ -17,4 +17,4 @@
package org.apache.streampark.flink.client.bean
-case class CancelResponse(savePointDir: String)
+case class CancelResponse(savepointDir: String)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointResponse.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointResponse.scala
index ca1b4b29a..6127f11e2 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointResponse.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointResponse.scala
@@ -18,4 +18,4 @@
package org.apache.streampark.flink.client.bean
/** Result class of trigger savepoint presents savepoint path. */
-case class SavepointResponse(savePointDir: String)
+case class SavepointResponse(savepointDir: String)