This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 392613280 [Hotfix] Add interface document and polish code (#3268)
392613280 is described below

commit 392613280ed996aa1783f8033f720e388d7e3ad3
Author: gongzhongqiang <[email protected]>
AuthorDate: Sun Oct 22 22:46:12 2023 +0800

    [Hotfix] Add interface document and polish code (#3268)
---
 .../console/core/aspect/ConsoleAspect.java         |  7 +-
 .../core/service/ApplicationBackUpService.java     | 42 +++++++++++
 .../core/service/ApplicationConfigService.java     | 86 ++++++++++++++++++++--
 .../console/core/service/FlinkEnvService.java      | 57 ++++++++------
 .../console/core/service/SavePointService.java     | 52 ++++++++++++-
 .../console/core/service/SettingService.java       | 32 ++++++++
 .../console/core/service/VariableService.java      | 64 ++++++++++++----
 .../core/service/impl/AppBuildPipeServiceImpl.java |  3 +-
 .../service/impl/ApplicationConfigServiceImpl.java | 54 +++++++-------
 .../core/service/impl/SavePointServiceImpl.java    | 12 +--
 .../console/core/watcher/FlinkAppHttpWatcher.java  |  2 +-
 .../console/core/watcher/FlinkAppLostWatcher.java  |  7 +-
 .../console/core/websocket/WebSocketEndpoint.java  | 23 +++---
 .../console/system/authentication/JWTUtil.java     |  2 +-
 .../doris/internal/DorisSinkFunction.java          |  3 +-
 15 files changed, 347 insertions(+), 99 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/ConsoleAspect.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/ConsoleAspect.java
index 303730bb9..c500c61d0 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/ConsoleAspect.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/ConsoleAspect.java
@@ -30,7 +30,6 @@ import 
org.apache.streampark.console.core.service.CommonService;
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
 import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
 import org.apache.streampark.console.system.entity.AccessToken;
-import org.apache.streampark.console.system.entity.Member;
 import org.apache.streampark.console.system.entity.User;
 import org.apache.streampark.console.system.service.MemberService;
 
@@ -126,17 +125,15 @@ public class ConsoleAspect {
               "Permission denied, only user himself can access this 
permission");
           break;
         case TEAM:
-          Member member = memberService.findByUserName(paramId, 
currentUser.getUsername());
           ApiAlertException.throwIfTrue(
-              member == null,
+              memberService.findByUserName(paramId, currentUser.getUsername()) 
== null,
               "Permission denied, only user belongs to this team can access 
this permission");
           break;
         case APP:
           Application app = applicationManageService.getById(paramId);
           ApiAlertException.throwIfTrue(app == null, "Invalid operation, 
application is null");
-          member = memberService.findByUserName(app.getTeamId(), 
currentUser.getUsername());
           ApiAlertException.throwIfTrue(
-              member == null,
+              memberService.findByUserName(app.getTeamId(), 
currentUser.getUsername()) == null,
               "Permission denied, only user belongs to this team can access 
this permission");
           break;
         default:
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationBackUpService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationBackUpService.java
index 6f441e2f1..e4eed3c1f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationBackUpService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationBackUpService.java
@@ -26,19 +26,61 @@ import org.apache.streampark.console.core.entity.FlinkSql;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.service.IService;
 
+/** Interface representing a service for application backup operations. */
 public interface ApplicationBackUpService extends IService<ApplicationBackUp> {
 
+  /**
+   * Deletes an object specified by the given ID.
+   *
+   * @param id The ID of the object to delete.
+   * @return true if the object was successfully deleted, false otherwise.
+   * @throws InternalException if an internal error occurs during the deletion 
process.
+   */
   Boolean delete(Long id) throws InternalException;
 
+  /**
+   * Performs a backup for the given application and Flink SQL parameters.
+   *
+   * @param appParam The application to back up.
+   * @param flinkSqlParam The Flink SQL to back up.
+   */
   void backup(Application appParam, FlinkSql flinkSqlParam);
 
+  /**
+   * Retrieves a page of {@link ApplicationBackUp} objects based on the 
provided parameters.
+   *
+   * @param bakParam The {@link ApplicationBackUp} object containing the 
search criteria.
+   * @param request The {@link RestRequest} object used for pagination and 
sorting.
+   * @return An {@link IPage} containing the retrieved {@link 
ApplicationBackUp} objects.
+   */
   IPage<ApplicationBackUp> page(ApplicationBackUp bakParam, RestRequest 
request);
 
+  /**
+   * Rolls back the changes made by the specified application backup.
+   *
+   * @param bakParam The ApplicationBackUp object representing the backup to 
roll back.
+   */
   void rollback(ApplicationBackUp bakParam);
 
+  /**
+   * Revoke the given application.
+   *
+   * @param appParam The application to be revoked.
+   */
   void revoke(Application appParam);
 
+  /**
+   * Removes the specified application.
+   *
+   * @param appParam the application to be removed
+   */
   void removeApp(Application appParam);
 
+  /**
+   * Rolls back a Flink SQL application to its previous state.
+   *
+   * @param appParam The application to rollback.
+   * @param flinkSqlParam The Flink SQL instance associated with the 
application.
+   */
   void rollbackFlinkSql(Application appParam, FlinkSql flinkSqlParam);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationConfigService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationConfigService.java
index 1fa81cd57..d3531b67f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationConfigService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationConfigService.java
@@ -26,27 +26,101 @@ import com.baomidou.mybatisplus.extension.service.IService;
 
 import java.util.List;
 
+/** This interface defines the methods to manage the application 
configuration. */
 public interface ApplicationConfigService extends IService<ApplicationConfig> {
 
-  void create(Application application, Boolean latest);
-
-  void update(Application application, Boolean latest);
-
+  /**
+   * Creates a new instance of an Application.
+   *
+   * @param appParam The Application object to create.
+   * @param latest If set to true, sets the created Application as the latest 
version.
+   */
+  void create(Application appParam, Boolean latest);
+
+  /**
+   * Updates the given application.
+   *
+   * @param appParam the application to be updated
+   * @param latest a boolean indicating whether to update to the latest version
+   */
+  void update(Application appParam, Boolean latest);
+
+  /**
+   * Sets the latest or effective flag for a given configuration and 
application. The latest flag
+   * determines whether the configuration is the latest version available. The 
effective flag
+   * determines whether the configuration is effective for the application.
+   *
+   * @param latest a boolean value indicating whether the configuration is the 
latest version (true)
+   *     or not (false)
+   * @param configId the ID of the configuration
+   * @param appId the ID of the application
+   */
   void setLatestOrEffective(Boolean latest, Long configId, Long appId);
 
+  /**
+   * Sets the configuration to effective for the given application and 
configuration ID.
+   *
+   * @param appId The ID of the application
+   * @param configId The ID of the configuration
+   */
   void toEffective(Long appId, Long configId);
 
+  /**
+   * Returns the latest version of the application configuration for the given 
application ID.
+   *
+   * @param appId The ID of the application
+   * @return The latest version of the application configuration
+   */
   ApplicationConfig getLatest(Long appId);
 
+  /**
+   * Retrieves the effective ApplicationConfig for the given appId.
+   *
+   * @param appId The identifier of the application.
+   * @return The effective ApplicationConfig.
+   */
   ApplicationConfig getEffective(Long appId);
 
+  /**
+   * Retrieves the ApplicationConfig for the specified ID.
+   *
+   * @param id the ID of the ApplicationConfig to retrieve
+   * @return the ApplicationConfig object corresponding to the specified ID, 
or null if no
+   *     ApplicationConfig is found
+   */
   ApplicationConfig get(Long id);
 
+  /**
+   * Retrieves a page of ApplicationConfig objects based on the specified 
ApplicationConfig and
+   * RestRequest.
+   *
+   * @param config the ApplicationConfig object to use as a filter for 
retrieving the page
+   * @param request the RestRequest object containing additional parameters 
and settings for
+   *     retrieving the page
+   * @return an IPage containing the ApplicationConfig objects that match the 
filter criteria
+   *     specified in the config object, limited by the settings in the 
request object
+   */
   IPage<ApplicationConfig> page(ApplicationConfig config, RestRequest request);
 
-  List<ApplicationConfig> history(Application application);
-
+  /**
+   * Retrieves the history of application configurations for a given 
application.
+   *
+   * @param appParam The application for which to retrieve the history.
+   * @return The list of application configurations representing the history.
+   */
+  List<ApplicationConfig> history(Application appParam);
+
+  /**
+   * Reads a template from a file or a database.
+   *
+   * @return the content of the template as a String
+   */
   String readTemplate();
 
+  /**
+   * Removes the app with the specified appId.
+   *
+   * @param appId The id of the app to be removed.
+   */
   void removeApp(Long appId);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
index c00f934b9..f47e53c95 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
@@ -26,71 +26,84 @@ import java.io.IOException;
 public interface FlinkEnvService extends IService<FlinkEnv> {
 
   /**
-   * check exists
+   * Checks if a specific version of Flink exists.
    *
-   * @param version
-   * @return
+   * @param version The version of Flink to check.
+   * @return Returns an Integer value indicating the existence of the 
specified version: - 0 if the
+   *     version exists - 1 if the version does not exist - null if the 
version is invalid or an
+   *     error occurred during the check
    */
   Integer check(FlinkEnv version);
 
   /**
-   * create new
+   * Create a new instance.
    *
-   * @param version
-   * @throws IOException
+   * @param version The version of FlinkEnv to use.
+   * @throws Exception if an error occurs during the creation process.
+   * @return true if the instance is successfully created, false otherwise.
    */
   boolean create(FlinkEnv version) throws Exception;
 
   /**
-   * delete flink env
+   * Deletes a Flink environment with the provided ID.
    *
-   * @param id
+   * @param id the ID of the Flink environment to delete
    */
   void delete(Long id);
 
   /**
-   * update
+   * Updates the specified version of Flink environment.
    *
-   * @param version
-   * @throws IOException
+   * @param version the version of Flink environment to update
+   * @throws IOException if an I/O error occurs during the update process
    */
   void update(FlinkEnv version) throws IOException;
 
   /**
-   * get flink version by appid
+   * Get flink version by application id.
    *
-   * @param appId
-   * @return
+   * @param appId the ID of the application
+   * @return the FlinkEnv object representing the version of Flink associated 
with the given app ID
    */
   FlinkEnv getByAppId(Long appId);
 
   /**
-   * set a flink version as the default
+   * Sets the specified Flink version as the default.
    *
-   * @param id
+   * @param id The ID of the Flink version to set as the default.
    */
   void setDefault(Long id);
 
   /**
-   * get default version
+   * Retrieves the default version of FlinkEnv.
    *
-   * @return
+   * @return the default version of FlinkEnv
    */
   FlinkEnv getDefault();
 
   /**
-   * get flink version, if null, get default version
+   * Retrieves a Flink environment by ID, if available. If the ID is null or 
not found, the method
+   * returns the default Flink environment.
    *
-   * @return
+   * @param id The ID of the Flink environment to retrieve. If null, the 
default environment will be
+   *     retrieved.
+   * @return The Flink environment with the specified ID, or the default 
environment if the ID is
+   *     null or not found.
    */
   FlinkEnv getByIdOrDefault(Long id);
 
   /**
-   * sycn conf file
+   * Synchronizes the configuration file for the given id.
    *
-   * @param id
+   * @param id The id of the configuration file to be synchronized.
+   * @throws IOException If an I/O error occurs while synchronizing the 
configuration file.
    */
   void syncConf(Long id) throws IOException;
 
+  /**
+   * Checks the validity of the given ID.
+   *
+   * @param id The ID to check for validity.
+   */
   void validity(Long id);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
index 684709dc7..20e772934 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
@@ -29,17 +29,63 @@ import javax.annotation.Nullable;
 
 public interface SavePointService extends IService<SavePoint> {
 
+  /**
+   * Expires all savepoints for the specified application.
+   *
+   * @param appId the ID of the application to expire
+   */
   void expire(Long appId);
 
+  /**
+   * Retrieves the latest savepoint based on the given id.
+   *
+   * @param id the unique identifier of the SavePoint
+   * @return the latest SavePoint object, or null if not found
+   */
   SavePoint getLatest(Long id);
 
+  /**
+   * Triggers a savepoint for the specified application.
+   *
+   * @param appId the ID of the application to trigger the savepoint for
+   * @param savepointPath the path where the savepoint will be stored, or null 
if the default path
+   *     should be used
+   * @param nativeFormat true to store the savepoint in native format, false 
otherwise
+   */
   void trigger(Long appId, @Nullable String savepointPath, @Nullable Boolean 
nativeFormat);
 
-  Boolean delete(Long id, Application application) throws InternalException;
+  /**
+   * Deletes an application with the specified ID.
+   *
+   * @param id the ID of the application to be deleted
+   * @param appParam the application object representing the application to be 
deleted
+   * @return true if the application is successfully deleted, false otherwise
+   * @throws InternalException if there is an internal error during the 
deletion process
+   */
+  Boolean delete(Long id, Application appParam) throws InternalException;
 
+  /**
+   * Retrieves a page of savepoint objects based on the specified parameters.
+   *
+   * @param savePoint The SavePoint object to be used for filtering the page 
results.
+   * @param request The RestRequest object containing additional request 
parameters.
+   * @return An instance of IPage<SavePoint> representing the page of 
SavePoint objects.
+   */
   IPage<SavePoint> page(SavePoint savePoint, RestRequest request);
 
-  void removeApp(Application application);
+  /**
+   * Removes all savepoints for the specified application.
+   *
+   * @param appParam the application to be removed
+   */
+  void removeApp(Application appParam);
 
-  String getSavePointPath(Application app) throws Exception;
+  /**
+   * Returns the savepoint path for the given application.
+   *
+   * @param appParam the application for which to get the save point path
+   * @return the save point path for the given application
+   * @throws Exception if an error occurs while getting the save point path
+   */
+  String getSavePointPath(Application appParam) throws Exception;
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SettingService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SettingService.java
index cb8cfb007..8dafd32a6 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SettingService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SettingService.java
@@ -48,15 +48,47 @@ public interface SettingService extends IService<Setting> {
 
   String KEY_INGRESS_MODE_DEFAULT = "ingress.mode.default";
 
+  /**
+   * Retrieves the value of the setting associated with the specified key.
+   *
+   * @param key the key of the setting to retrieve
+   * @return the value of the setting if found, null otherwise
+   */
   Setting get(String key);
 
+  /**
+   * Updates the specified Setting.
+   *
+   * @param setting the Setting object to update
+   * @return true if the update is successful, false otherwise
+   */
   boolean update(Setting setting);
 
+  /**
+   * Retrieves the Maven configuration settings.
+   *
+   * @return The MavenConfig object containing the Maven configuration 
settings.
+   */
   MavenConfig getMavenConfig();
 
+  /**
+   * Retrieves the Docker configuration settings.
+   *
+   * @return The DockerConfig object representing the configuration for Docker.
+   */
   DockerConfig getDockerConfig();
 
+  /**
+   * Retrieves the StreamPark address.
+   *
+   * @return a String representing the StreamPark address.
+   */
   String getStreamParkAddress();
 
+  /**
+   * Retrieves the default ingress mode.
+   *
+   * @return The default ingress mode.
+   */
   String getIngressModeDefault();
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/VariableService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/VariableService.java
index c255b8254..82a329151 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/VariableService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/VariableService.java
@@ -29,47 +29,85 @@ import java.util.List;
 public interface VariableService extends IService<Variable> {
 
   /**
-   * find variable
+   * Find variables based on the given variable and query request.
    *
-   * @param variable variable
-   * @param restRequest queryRequest
-   * @return IPage
+   * @param variable The variable to search for.
+   * @param restRequest The query request containing search filters and 
pagination options.
+   * @return An IPage object containing the found Variable objects matching 
the search criteria.
    */
   IPage<Variable> page(Variable variable, RestRequest restRequest);
 
   /**
-   * get variables through team
+   * Retrieves a list of variables based on the team ID.
    *
-   * @param teamId
-   * @return
+   * @param teamId The ID of the team to filter the variables by.
+   * @return A list of variables that belong to the specified team.
    */
   List<Variable> findByTeamId(Long teamId);
 
   /**
-   * Get variables through team and search keywords.
+   * Retrieve a list of variables based on the team ID and search keywords.
    *
-   * @param teamId
-   * @param keyword Fuzzy search keywords through variable code or 
description, Nullable.
-   * @return
+   * @param teamId The ID of the team for which to retrieve the variables.
+   * @param keyword The fuzzy search keywords used to filter the variables. 
This parameter is
+   *     nullable.
+   * @return A List of Variable objects that match the specified team ID and 
search keywords.
    */
   List<Variable> findByTeamId(Long teamId, String keyword);
 
+  /**
+   * Check if a team exists by teamId.
+   *
+   * @param teamId the id of the team to check.
+   * @return true if a team exists with the given teamId, false otherwise.
+   */
   boolean existsByTeamId(Long teamId);
 
   /**
-   * create variable
+   * Create a variable.
    *
-   * @param variable variable
+   * @param variable The variable to be created.
    */
   void createVariable(Variable variable);
 
+  /**
+   * Deletes a Variable.
+   *
+   * @param variable the Variable object to be deleted
+   */
   void deleteVariable(Variable variable);
 
+  /**
+   * Find a Variable by its code and team ID.
+   *
+   * @param teamId The ID of the team to search within.
+   * @param variableCode The code of the variable to find.
+   * @return The Variable found, or null if no match is found.
+   */
   Variable findByVariableCode(Long teamId, String variableCode);
 
+  /**
+   * Replaces a specified variable in the given string with the corresponding 
variable value.
+   *
+   * @param teamId The identifier of the team.
+   * @param mixed The string that may contain variables to be replaced.
+   * @return The modified string after replacing the variables.
+   */
   String replaceVariable(Long teamId, String mixed);
 
+  /**
+   * Retrieves a page of dependent applications based on the given variable 
and request.
+   *
+   * @param variable The variable to use for retrieving dependent applications.
+   * @param request The REST request containing additional parameters for 
retrieving the page.
+   * @return An instance of IPage<Application> containing the dependent 
applications.
+   */
   IPage<Application> dependAppsPage(Variable variable, RestRequest request);
 
+  /**
+   * Updates the given variable.
+   *
+   * @param variable the variable to be updated
+   */
   void updateVariable(Variable variable);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 539b196d3..1c9984752 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -94,6 +94,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import lombok.extern.slf4j.Slf4j;
+import org.jetbrains.annotations.NotNull;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
@@ -175,7 +176,7 @@ public class AppBuildPipeServiceImpl
    * @return Whether the pipeline was successfully started
    */
   @Override
-  public boolean buildApplication(Long appId, boolean forceBuild) {
+  public boolean buildApplication(@NotNull Long appId, boolean forceBuild) {
     // check the build environment
     checkBuildEnv(appId, forceBuild);
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
index 2428b409b..4c471c7b1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
@@ -64,15 +64,15 @@ public class ApplicationConfigServiceImpl
   @Autowired private EffectiveService effectiveService;
 
   @Override
-  public synchronized void create(Application application, Boolean latest) {
-    String decode = new 
String(Base64.getDecoder().decode(application.getConfig()));
+  public synchronized void create(Application appParam, Boolean latest) {
+    String decode = new 
String(Base64.getDecoder().decode(appParam.getConfig()));
     String config = DeflaterUtils.zipString(decode.trim());
 
     ApplicationConfig applicationConfig = new ApplicationConfig();
-    applicationConfig.setAppId(application.getId());
+    applicationConfig.setAppId(appParam.getId());
 
-    if (application.getFormat() != null) {
-      ConfigFileTypeEnum fileType = 
ConfigFileTypeEnum.of(application.getFormat());
+    if (appParam.getFormat() != null) {
+      ConfigFileTypeEnum fileType = 
ConfigFileTypeEnum.of(appParam.getFormat());
       ApiAlertException.throwIfTrue(
           fileType == null || ConfigFileTypeEnum.UNKNOWN == fileType,
           "application' config error. must be (.properties|.yaml|.yml 
|.conf)");
@@ -82,10 +82,10 @@ public class ApplicationConfigServiceImpl
 
     applicationConfig.setContent(config);
     applicationConfig.setCreateTime(new Date());
-    Integer version = this.baseMapper.getLastVersion(application.getId());
+    Integer version = this.baseMapper.getLastVersion(appParam.getId());
     applicationConfig.setVersion(version == null ? 1 : version + 1);
     save(applicationConfig);
-    this.setLatestOrEffective(latest, applicationConfig.getId(), 
application.getId());
+    this.setLatestOrEffective(latest, applicationConfig.getId(), 
appParam.getId());
   }
 
   public void setLatest(Long appId, Long configId) {
@@ -99,15 +99,15 @@ public class ApplicationConfigServiceImpl
   }
 
   @Override
-  public synchronized void update(Application application, Boolean latest) {
+  public synchronized void update(Application appParam, Boolean latest) {
     // flink sql job
-    ApplicationConfig latestConfig = getLatest(application.getId());
-    if (application.isFlinkSqlJob()) {
+    ApplicationConfig latestConfig = getLatest(appParam.getId());
+    if (appParam.isFlinkSqlJob()) {
       // get effect config
-      ApplicationConfig effectiveConfig = getEffective(application.getId());
-      if (Utils.isEmpty(application.getConfig())) {
+      ApplicationConfig effectiveConfig = getEffective(appParam.getId());
+      if (Utils.isEmpty(appParam.getConfig())) {
         if (effectiveConfig != null) {
-          effectiveService.delete(application.getId(), 
EffectiveTypeEnum.CONFIG);
+          effectiveService.delete(appParam.getId(), EffectiveTypeEnum.CONFIG);
         }
       } else {
         // there was no configuration before, is a new configuration
@@ -115,48 +115,48 @@ public class ApplicationConfigServiceImpl
           if (latestConfig != null) {
             removeById(latestConfig.getId());
           }
-          this.create(application, latest);
+          this.create(appParam, latest);
         } else {
-          String decode = new 
String(Base64.getDecoder().decode(application.getConfig()));
+          String decode = new 
String(Base64.getDecoder().decode(appParam.getConfig()));
           String encode = DeflaterUtils.zipString(decode.trim());
           // need to diff the two configs are consistent
           if (!effectiveConfig.getContent().equals(encode)) {
             if (latestConfig != null) {
               removeById(latestConfig.getId());
             }
-            this.create(application, latest);
+            this.create(appParam, latest);
           }
         }
       }
     } else {
       // may be re-selected a config file (without config id), or may be based 
on an original edit
       // (with config Id).
-      Long configId = application.getConfigId();
+      Long configId = appParam.getConfigId();
       // an original edit
       if (configId != null) {
         ApplicationConfig config = this.getById(configId);
-        String decode = new 
String(Base64.getDecoder().decode(application.getConfig()));
+        String decode = new 
String(Base64.getDecoder().decode(appParam.getConfig()));
         String encode = DeflaterUtils.zipString(decode.trim());
         // create...
         if (!config.getContent().equals(encode)) {
           if (latestConfig != null) {
             removeById(latestConfig.getId());
           }
-          this.create(application, latest);
+          this.create(appParam, latest);
         } else {
-          this.setLatestOrEffective(latest, configId, application.getId());
+          this.setLatestOrEffective(latest, configId, appParam.getId());
         }
       } else {
-        ApplicationConfig config = getEffective(application.getId());
+        ApplicationConfig config = getEffective(appParam.getId());
         if (config != null) {
-          String decode = new 
String(Base64.getDecoder().decode(application.getConfig()));
+          String decode = new 
String(Base64.getDecoder().decode(appParam.getConfig()));
           String encode = DeflaterUtils.zipString(decode.trim());
           // create...
           if (!config.getContent().equals(encode)) {
-            this.create(application, latest);
+            this.create(appParam, latest);
           }
         } else {
-          this.create(application, latest);
+          this.create(appParam, latest);
         }
       }
     }
@@ -211,14 +211,14 @@ public class ApplicationConfigServiceImpl
   }
 
   @Override
-  public List<ApplicationConfig> history(Application application) {
+  public List<ApplicationConfig> history(Application appParam) {
     LambdaQueryWrapper<ApplicationConfig> queryWrapper =
         new LambdaQueryWrapper<ApplicationConfig>()
-            .eq(ApplicationConfig::getAppId, application.getId())
+            .eq(ApplicationConfig::getAppId, appParam.getId())
             .orderByDesc(ApplicationConfig::getVersion);
 
     List<ApplicationConfig> configList = 
this.baseMapper.selectList(queryWrapper);
-    fillEffectiveField(application.getId(), configList);
+    fillEffectiveField(appParam.getId(), configList);
     return configList;
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 9092e7bae..60babca27 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -199,11 +199,11 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
   }
 
   @Override
-  public Boolean delete(Long id, Application application) throws 
InternalException {
+  public Boolean delete(Long id, Application appParam) throws 
InternalException {
     SavePoint savePoint = getById(id);
     try {
       if (StringUtils.isNotEmpty(savePoint.getPath())) {
-        application.getFsOperator().delete(savePoint.getPath());
+        appParam.getFsOperator().delete(savePoint.getPath());
       }
       return removeById(id);
     } catch (Exception e) {
@@ -221,17 +221,17 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
   }
 
   @Override
-  public void removeApp(Application application) {
-    Long appId = application.getId();
+  public void removeApp(Application appParam) {
+    Long appId = appParam.getId();
 
     LambdaQueryWrapper<SavePoint> queryWrapper =
         new LambdaQueryWrapper<SavePoint>().eq(SavePoint::getAppId, appId);
     this.remove(queryWrapper);
 
     try {
-      application
+      appParam
           .getFsOperator()
-          
.delete(application.getWorkspace().APP_SAVEPOINTS().concat("/").concat(appId.toString()));
+          
.delete(appParam.getWorkspace().APP_SAVEPOINTS().concat("/").concat(appId.toString()));
     } catch (Exception e) {
       log.error(e.getMessage(), e);
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
index 8726415db..52e9de29a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
@@ -253,7 +253,7 @@ public class FlinkAppHttpWatcher {
               ? jobsOverview.getJobs().stream()
                   .filter(a -> StringUtils.equals(application.getJobId(), 
a.getId()))
                   .findFirst()
-              : jobsOverview.getJobs().stream().findFirst();
+              : Optional.empty();
     } else {
       optional =
           jobsOverview.getJobs().stream()
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppLostWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppLostWatcher.java
index 30e196a3b..1ce68b735 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppLostWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppLostWatcher.java
@@ -98,7 +98,7 @@ public class FlinkAppLostWatcher {
             .filter(application -> 
FlinkAppStateEnum.isLost(application.getState()))
             .collect(Collectors.toList());
     updateState(probeApplication);
-    probeApplication.stream().forEach(this::monitorApplication);
+    probeApplication.forEach(this::monitorApplication);
   }
 
   private void updateState(List<Application> applications) {
@@ -118,7 +118,7 @@ public class FlinkAppLostWatcher {
       watch(probeApps);
     } else {
       List<AlertProbeMsg> alertProbeMsgs = generateProbeResults(probeApps);
-      alertProbeMsgs.stream().forEach(this::alert);
+      alertProbeMsgs.forEach(this::alert);
       reset(probeApps);
     }
   }
@@ -135,7 +135,8 @@ public class FlinkAppLostWatcher {
   }
 
   private void alert(AlertProbeMsg alertProbeMsg) {
-    alertProbeMsg.getAlertId().stream()
+    alertProbeMsg
+        .getAlertId()
         .forEach((alterId) -> alertService.alert(alterId, 
AlertTemplate.of(alertProbeMsg)));
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/websocket/WebSocketEndpoint.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/websocket/WebSocketEndpoint.java
index 3872b9051..49f58dba7 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/websocket/WebSocketEndpoint.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/websocket/WebSocketEndpoint.java
@@ -34,6 +34,7 @@ import javax.websocket.server.ServerEndpoint;
 import java.io.IOException;
 import java.util.Map;
 
+@Getter
 @Slf4j
 @Component
 @ServerEndpoint(value = "/websocket/{id}")
@@ -41,27 +42,29 @@ public class WebSocketEndpoint {
 
   private static final Map<String, Session> SOCKET_SESSIONS = new 
CopyOnWriteMap<>();
 
-  @Getter private String id;
+  private String id;
 
-  @Getter private Session session;
+  private Session session;
 
   @OnOpen
   public void onOpen(Session session, @PathParam("id") String id) {
-    if (log.isDebugEnabled()) {
-      log.debug("websocket onOpen....");
-    }
+    log.debug("Websocket onOpen....");
     this.id = id;
     this.session = session;
     SOCKET_SESSIONS.put(id, session);
   }
 
   @OnClose
-  public void onClose() throws IOException {
-    if (log.isDebugEnabled()) {
-      log.debug("websocket onClose....");
+  public void onClose() {
+    if (SOCKET_SESSIONS.containsKey(this.id)) {
+      try (Session remove = SOCKET_SESSIONS.remove(this.id)) {
+        if (remove != null) {
+          log.debug("Websocket onClose id: {}", this.id);
+        }
+      } catch (IOException e) {
+        log.error("WebSocket onClose error: {}", e.getMessage(), e);
+      }
     }
-    this.session.close();
-    SOCKET_SESSIONS.remove(this.id);
   }
 
   @OnError
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java
index b3c25aa0d..9b44eeed8 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java
@@ -114,7 +114,7 @@ public class JWTUtil {
           .withExpiresAt(date)
           .sign(algorithm);
     } catch (Exception e) {
-      log.error("error:{}", e);
+      log.error("error:{}", e.getMessage());
       return null;
     }
   }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java
index 210e92e38..c0ee6ee67 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Map;
 import java.util.Properties;
 
@@ -84,7 +85,7 @@ public class DorisSinkFunction<T> extends RichSinkFunction<T> 
implements Checkpo
         LOGGER.warn(
             String.format(
                 " row data not fulfilled. {database: %s, table: %s, dataRows: 
%s}",
-                data.getDatabase(), data.getTable(), data.getDataRows()));
+                data.getDatabase(), data.getTable(), 
Arrays.toString(data.getDataRows())));
         return;
       }
       dorisSinkWriter.writeRecords(data.getDatabase(), data.getTable(), 
data.getDataRows());


Reply via email to