This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 392613280 [Hotfix] Add interface document and polish code (#3268)
392613280 is described below
commit 392613280ed996aa1783f8033f720e388d7e3ad3
Author: gongzhongqiang <[email protected]>
AuthorDate: Sun Oct 22 22:46:12 2023 +0800
[Hotfix] Add interface document and polish code (#3268)
---
.../console/core/aspect/ConsoleAspect.java | 7 +-
.../core/service/ApplicationBackUpService.java | 42 +++++++++++
.../core/service/ApplicationConfigService.java | 86 ++++++++++++++++++++--
.../console/core/service/FlinkEnvService.java | 57 ++++++++------
.../console/core/service/SavePointService.java | 52 ++++++++++++-
.../console/core/service/SettingService.java | 32 ++++++++
.../console/core/service/VariableService.java | 64 ++++++++++++----
.../core/service/impl/AppBuildPipeServiceImpl.java | 3 +-
.../service/impl/ApplicationConfigServiceImpl.java | 54 +++++++-------
.../core/service/impl/SavePointServiceImpl.java | 12 +--
.../console/core/watcher/FlinkAppHttpWatcher.java | 2 +-
.../console/core/watcher/FlinkAppLostWatcher.java | 7 +-
.../console/core/websocket/WebSocketEndpoint.java | 23 +++---
.../console/system/authentication/JWTUtil.java | 2 +-
.../doris/internal/DorisSinkFunction.java | 3 +-
15 files changed, 347 insertions(+), 99 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/ConsoleAspect.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/ConsoleAspect.java
index 303730bb9..c500c61d0 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/ConsoleAspect.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/ConsoleAspect.java
@@ -30,7 +30,6 @@ import
org.apache.streampark.console.core.service.CommonService;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
import org.apache.streampark.console.system.entity.AccessToken;
-import org.apache.streampark.console.system.entity.Member;
import org.apache.streampark.console.system.entity.User;
import org.apache.streampark.console.system.service.MemberService;
@@ -126,17 +125,15 @@ public class ConsoleAspect {
"Permission denied, only user himself can access this
permission");
break;
case TEAM:
- Member member = memberService.findByUserName(paramId,
currentUser.getUsername());
ApiAlertException.throwIfTrue(
- member == null,
+ memberService.findByUserName(paramId, currentUser.getUsername())
== null,
"Permission denied, only user belongs to this team can access
this permission");
break;
case APP:
Application app = applicationManageService.getById(paramId);
ApiAlertException.throwIfTrue(app == null, "Invalid operation,
application is null");
- member = memberService.findByUserName(app.getTeamId(),
currentUser.getUsername());
ApiAlertException.throwIfTrue(
- member == null,
+ memberService.findByUserName(app.getTeamId(),
currentUser.getUsername()) == null,
"Permission denied, only user belongs to this team can access
this permission");
break;
default:
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationBackUpService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationBackUpService.java
index 6f441e2f1..e4eed3c1f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationBackUpService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationBackUpService.java
@@ -26,19 +26,61 @@ import org.apache.streampark.console.core.entity.FlinkSql;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService;
+/** Interface representing a service for application backup operations. */
public interface ApplicationBackUpService extends IService<ApplicationBackUp> {
+ /**
+ * Deletes an object specified by the given ID.
+ *
+ * @param id The ID of the object to delete.
+ * @return true if the object was successfully deleted, false otherwise.
+ * @throws InternalException if an internal error occurs during the deletion
process.
+ */
Boolean delete(Long id) throws InternalException;
+ /**
+ * Performs a backup for the given application and Flink SQL parameters.
+ *
+ * @param appParam The application to back up.
+ * @param flinkSqlParam The Flink SQL to back up.
+ */
void backup(Application appParam, FlinkSql flinkSqlParam);
+ /**
+ * Retrieves a page of {@link ApplicationBackUp} objects based on the
provided parameters.
+ *
+ * @param bakParam The {@link ApplicationBackUp} object containing the
search criteria.
+ * @param request The {@link RestRequest} object used for pagination and
sorting.
+ * @return An {@link IPage} containing the retrieved {@link
ApplicationBackUp} objects.
+ */
IPage<ApplicationBackUp> page(ApplicationBackUp bakParam, RestRequest
request);
+ /**
+ * Rolls back the changes made by the specified application backup.
+ *
+ * @param bakParam The ApplicationBackUp object representing the backup to
roll back.
+ */
void rollback(ApplicationBackUp bakParam);
+ /**
+ * Revoke the given application.
+ *
+ * @param appParam The application to be revoked.
+ */
void revoke(Application appParam);
+ /**
+ * Removes the specified application.
+ *
+ * @param appParam the application to be removed
+ */
void removeApp(Application appParam);
+ /**
+ * Rolls back a Flink SQL application to its previous state.
+ *
+ * @param appParam The application to rollback.
+ * @param flinkSqlParam The Flink SQL instance associated with the
application.
+ */
void rollbackFlinkSql(Application appParam, FlinkSql flinkSqlParam);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationConfigService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationConfigService.java
index 1fa81cd57..d3531b67f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationConfigService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationConfigService.java
@@ -26,27 +26,101 @@ import com.baomidou.mybatisplus.extension.service.IService;
import java.util.List;
+/** This interface defines the methods to manage the application
configuration. */
public interface ApplicationConfigService extends IService<ApplicationConfig> {
- void create(Application application, Boolean latest);
-
- void update(Application application, Boolean latest);
-
+ /**
+ * Creates a new instance of an Application.
+ *
+ * @param appParam The Application object to create.
+ * @param latest If set to true, sets the created Application as the latest
version.
+ */
+ void create(Application appParam, Boolean latest);
+
+ /**
+ * Updates the given application.
+ *
+ * @param appParam the application to be updated
+ * @param latest a boolean indicating whether to update to the latest version
+ */
+ void update(Application appParam, Boolean latest);
+
+ /**
+ * Sets the latest or effective flag for a given configuration and
application. The latest flag
+ * determines whether the configuration is the latest version available. The
effective flag
+ * determines whether the configuration is effective for the application.
+ *
+ * @param latest a boolean value indicating whether the configuration is the
latest version (true)
+ * or not (false)
+ * @param configId the ID of the configuration
+ * @param appId the ID of the application
+ */
void setLatestOrEffective(Boolean latest, Long configId, Long appId);
+ /**
+ * Sets the configuration to effective for the given application and
configuration ID.
+ *
+ * @param appId The ID of the application
+ * @param configId The ID of the configuration
+ */
void toEffective(Long appId, Long configId);
+ /**
+ * Returns the latest version of the application configuration for the given
application ID.
+ *
+ * @param appId The ID of the application
+ * @return The latest version of the application configuration
+ */
ApplicationConfig getLatest(Long appId);
+ /**
+ * Retrieves the effective ApplicationConfig for the given appId.
+ *
+ * @param appId The identifier of the application.
+ * @return The effective ApplicationConfig.
+ */
ApplicationConfig getEffective(Long appId);
+ /**
+ * Retrieves the ApplicationConfig for the specified ID.
+ *
+ * @param id the ID of the ApplicationConfig to retrieve
+ * @return the ApplicationConfig object corresponding to the specified ID,
or null if no
+ * ApplicationConfig is found
+ */
ApplicationConfig get(Long id);
+ /**
+ * Retrieves a page of ApplicationConfig objects based on the specified
ApplicationConfig and
+ * RestRequest.
+ *
+ * @param config the ApplicationConfig object to use as a filter for
retrieving the page
+ * @param request the RestRequest object containing additional parameters
and settings for
+ * retrieving the page
+ * @return an IPage containing the ApplicationConfig objects that match the
filter criteria
+ * specified in the config object, limited by the settings in the
request object
+ */
IPage<ApplicationConfig> page(ApplicationConfig config, RestRequest request);
- List<ApplicationConfig> history(Application application);
-
+ /**
+ * Retrieves the history of application configurations for a given
application.
+ *
+ * @param appParam The application for which to retrieve the history.
+ * @return The list of application configurations representing the history.
+ */
+ List<ApplicationConfig> history(Application appParam);
+
+ /**
+ * Reads a template from a file or a database.
+ *
+ * @return the content of the template as a String
+ */
String readTemplate();
+ /**
+ * Removes the app with the specified appId.
+ *
+ * @param appId The id of the app to be removed.
+ */
void removeApp(Long appId);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
index c00f934b9..f47e53c95 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
@@ -26,71 +26,84 @@ import java.io.IOException;
public interface FlinkEnvService extends IService<FlinkEnv> {
/**
- * check exists
+ * Checks if a specific version of Flink exists.
*
- * @param version
- * @return
+ * @param version The version of Flink to check.
+ * @return Returns an Integer value indicating the existence of the
specified version: - 0 if the
+ * version exists - 1 if the version does not exist - null if the
version is invalid or an
+ * error occurred during the check
*/
Integer check(FlinkEnv version);
/**
- * create new
+ * Create a new instance.
*
- * @param version
- * @throws IOException
+ * @param version The version of FlinkEnv to use.
+ * @throws Exception if an error occurs during the creation process.
+ * @return true if the instance is successfully created, false otherwise.
*/
boolean create(FlinkEnv version) throws Exception;
/**
- * delete flink env
+ * Deletes a Flink environment with the provided ID.
*
- * @param id
+ * @param id the ID of the Flink environment to delete
*/
void delete(Long id);
/**
- * update
+ * Updates the specified version of Flink environment.
*
- * @param version
- * @throws IOException
+ * @param version the version of Flink environment to update
+ * @throws IOException if an I/O error occurs during the update process
*/
void update(FlinkEnv version) throws IOException;
/**
- * get flink version by appid
+ * Get flink version by application id.
*
- * @param appId
- * @return
+ * @param appId the ID of the application
+ * @return the FlinkEnv object representing the version of Flink associated
with the given app ID
*/
FlinkEnv getByAppId(Long appId);
/**
- * set a flink version as the default
+ * Sets the specified Flink version as the default.
*
- * @param id
+ * @param id The ID of the Flink version to set as the default.
*/
void setDefault(Long id);
/**
- * get default version
+ * Retrieves the default version of FlinkEnv.
*
- * @return
+ * @return the default version of FlinkEnv
*/
FlinkEnv getDefault();
/**
- * get flink version, if null, get default version
+ * Retrieves a Flink environment by ID, if available. If the ID is null or
not found, the method
+ * returns the default Flink environment.
*
- * @return
+ * @param id The ID of the Flink environment to retrieve. If null, the
default environment will be
+ * retrieved.
+ * @return The Flink environment with the specified ID, or the default
environment if the ID is
+ * null or not found.
*/
FlinkEnv getByIdOrDefault(Long id);
/**
- * sycn conf file
+ * Synchronizes the configuration file for the given id.
*
- * @param id
+ * @param id The id of the configuration file to be synchronized.
+ * @throws IOException If an I/O error occurs while synchronizing the
configuration file.
*/
void syncConf(Long id) throws IOException;
+ /**
+ * Checks the validity of the given ID.
+ *
+ * @param id The ID to check for validity.
+ */
void validity(Long id);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
index 684709dc7..20e772934 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java
@@ -29,17 +29,63 @@ import javax.annotation.Nullable;
public interface SavePointService extends IService<SavePoint> {
+ /**
+ * Expires all savepoints for the specified application.
+ *
+ * @param appId the ID of the application to expire
+ */
void expire(Long appId);
+ /**
+ * Retrieves the latest savepoint based on the given id.
+ *
+ * @param id the unique identifier of the SavePoint
+ * @return the latest SavePoint object, or null if not found
+ */
SavePoint getLatest(Long id);
+ /**
+ * Triggers a savepoint for the specified application.
+ *
+ * @param appId the ID of the application to trigger the savepoint for
+ * @param savepointPath the path where the savepoint will be stored, or null
if the default path
+ * should be used
+ * @param nativeFormat true to store the savepoint in native format, false
otherwise
+ */
void trigger(Long appId, @Nullable String savepointPath, @Nullable Boolean
nativeFormat);
- Boolean delete(Long id, Application application) throws InternalException;
+ /**
+ * Deletes an application with the specified ID.
+ *
+ * @param id the ID of the application to be deleted
+ * @param appParam the application object representing the application to be
deleted
+ * @return true if the application is successfully deleted, false otherwise
+ * @throws InternalException if there is an internal error during the
deletion process
+ */
+ Boolean delete(Long id, Application appParam) throws InternalException;
+ /**
+ * Retrieves a page of savepoint objects based on the specified parameters.
+ *
+ * @param savePoint The SavePoint object to be used for filtering the page
results.
+ * @param request The RestRequest object containing additional request
parameters.
+ * @return An instance of IPage<SavePoint> representing the page of
SavePoint objects.
+ */
IPage<SavePoint> page(SavePoint savePoint, RestRequest request);
- void removeApp(Application application);
+ /**
+ * Removes all savepoints for the specified application.
+ *
+ * @param appParam the application to be removed
+ */
+ void removeApp(Application appParam);
- String getSavePointPath(Application app) throws Exception;
+ /**
+ * Returns the savepoint path for the given application.
+ *
+ * @param appParam the application for which to get the save point path
+ * @return the save point path for the given application
+ * @throws Exception if an error occurs while getting the save point path
+ */
+ String getSavePointPath(Application appParam) throws Exception;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SettingService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SettingService.java
index cb8cfb007..8dafd32a6 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SettingService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SettingService.java
@@ -48,15 +48,47 @@ public interface SettingService extends IService<Setting> {
String KEY_INGRESS_MODE_DEFAULT = "ingress.mode.default";
+ /**
+ * Retrieves the value of the setting associated with the specified key.
+ *
+ * @param key the key of the setting to retrieve
+ * @return the value of the setting if found, null otherwise
+ */
Setting get(String key);
+ /**
+ * Updates the specified Setting.
+ *
+ * @param setting the Setting object to update
+ * @return true if the update is successful, false otherwise
+ */
boolean update(Setting setting);
+ /**
+ * Retrieves the Maven configuration settings.
+ *
+ * @return The MavenConfig object containing the Maven configuration
settings.
+ */
MavenConfig getMavenConfig();
+ /**
+ * Retrieves the Docker configuration settings.
+ *
+ * @return The DockerConfig object representing the configuration for Docker.
+ */
DockerConfig getDockerConfig();
+ /**
+ * Retrieves the StreamPark address.
+ *
+ * @return a String representing the StreamPark address.
+ */
String getStreamParkAddress();
+ /**
+ * Retrieves the default ingress mode.
+ *
+ * @return The default ingress mode.
+ */
String getIngressModeDefault();
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/VariableService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/VariableService.java
index c255b8254..82a329151 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/VariableService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/VariableService.java
@@ -29,47 +29,85 @@ import java.util.List;
public interface VariableService extends IService<Variable> {
/**
- * find variable
+ * Find variables based on the given variable and query request.
*
- * @param variable variable
- * @param restRequest queryRequest
- * @return IPage
+ * @param variable The variable to search for.
+ * @param restRequest The query request containing search filters and
pagination options.
+ * @return An IPage object containing the found Variable objects matching
the search criteria.
*/
IPage<Variable> page(Variable variable, RestRequest restRequest);
/**
- * get variables through team
+ * Retrieves a list of variables based on the team ID.
*
- * @param teamId
- * @return
+ * @param teamId The ID of the team to filter the variables by.
+ * @return A list of variables that belong to the specified team.
*/
List<Variable> findByTeamId(Long teamId);
/**
- * Get variables through team and search keywords.
+ * Retrieve a list of variables based on the team ID and search keywords.
*
- * @param teamId
- * @param keyword Fuzzy search keywords through variable code or
description, Nullable.
- * @return
+ * @param teamId The ID of the team for which to retrieve the variables.
+ * @param keyword The fuzzy search keywords used to filter the variables.
This parameter is
+ * nullable.
+ * @return A List of Variable objects that match the specified team ID and
search keywords.
*/
List<Variable> findByTeamId(Long teamId, String keyword);
+ /**
+ * Check if a team exists by teamId.
+ *
+ * @param teamId the id of the team to check.
+ * @return true if a team exists with the given teamId, false otherwise.
+ */
boolean existsByTeamId(Long teamId);
/**
- * create variable
+ * Create a variable.
*
- * @param variable variable
+ * @param variable The variable to be created.
*/
void createVariable(Variable variable);
+ /**
+ * Deletes a Variable.
+ *
+ * @param variable the Variable object to be deleted
+ */
void deleteVariable(Variable variable);
+ /**
+ * Find a Variable by its code and team ID.
+ *
+ * @param teamId The ID of the team to search within.
+ * @param variableCode The code of the variable to find.
+ * @return The Variable found, or null if no match is found.
+ */
Variable findByVariableCode(Long teamId, String variableCode);
+ /**
+ * Replaces a specified variable in the given string with the corresponding
variable value.
+ *
+ * @param teamId The identifier of the team.
+ * @param mixed The string that may contain variables to be replaced.
+ * @return The modified string after replacing the variables.
+ */
String replaceVariable(Long teamId, String mixed);
+ /**
+ * Retrieves a page of dependent applications based on the given variable
and request.
+ *
+ * @param variable The variable to use for retrieving dependent applications.
+ * @param request The REST request containing additional parameters for
retrieving the page.
+ * @return An instance of IPage<Application> containing the dependent
applications.
+ */
IPage<Application> dependAppsPage(Variable variable, RestRequest request);
+ /**
+ * Updates the given variable.
+ *
+ * @param variable the variable to be updated
+ */
void updateVariable(Variable variable);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 539b196d3..1c9984752 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -94,6 +94,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.extern.slf4j.Slf4j;
+import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
@@ -175,7 +176,7 @@ public class AppBuildPipeServiceImpl
* @return Whether the pipeline was successfully started
*/
@Override
- public boolean buildApplication(Long appId, boolean forceBuild) {
+ public boolean buildApplication(@NotNull Long appId, boolean forceBuild) {
// check the build environment
checkBuildEnv(appId, forceBuild);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
index 2428b409b..4c471c7b1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
@@ -64,15 +64,15 @@ public class ApplicationConfigServiceImpl
@Autowired private EffectiveService effectiveService;
@Override
- public synchronized void create(Application application, Boolean latest) {
- String decode = new
String(Base64.getDecoder().decode(application.getConfig()));
+ public synchronized void create(Application appParam, Boolean latest) {
+ String decode = new
String(Base64.getDecoder().decode(appParam.getConfig()));
String config = DeflaterUtils.zipString(decode.trim());
ApplicationConfig applicationConfig = new ApplicationConfig();
- applicationConfig.setAppId(application.getId());
+ applicationConfig.setAppId(appParam.getId());
- if (application.getFormat() != null) {
- ConfigFileTypeEnum fileType =
ConfigFileTypeEnum.of(application.getFormat());
+ if (appParam.getFormat() != null) {
+ ConfigFileTypeEnum fileType =
ConfigFileTypeEnum.of(appParam.getFormat());
ApiAlertException.throwIfTrue(
fileType == null || ConfigFileTypeEnum.UNKNOWN == fileType,
"application' config error. must be (.properties|.yaml|.yml
|.conf)");
@@ -82,10 +82,10 @@ public class ApplicationConfigServiceImpl
applicationConfig.setContent(config);
applicationConfig.setCreateTime(new Date());
- Integer version = this.baseMapper.getLastVersion(application.getId());
+ Integer version = this.baseMapper.getLastVersion(appParam.getId());
applicationConfig.setVersion(version == null ? 1 : version + 1);
save(applicationConfig);
- this.setLatestOrEffective(latest, applicationConfig.getId(),
application.getId());
+ this.setLatestOrEffective(latest, applicationConfig.getId(),
appParam.getId());
}
public void setLatest(Long appId, Long configId) {
@@ -99,15 +99,15 @@ public class ApplicationConfigServiceImpl
}
@Override
- public synchronized void update(Application application, Boolean latest) {
+ public synchronized void update(Application appParam, Boolean latest) {
// flink sql job
- ApplicationConfig latestConfig = getLatest(application.getId());
- if (application.isFlinkSqlJob()) {
+ ApplicationConfig latestConfig = getLatest(appParam.getId());
+ if (appParam.isFlinkSqlJob()) {
// get effect config
- ApplicationConfig effectiveConfig = getEffective(application.getId());
- if (Utils.isEmpty(application.getConfig())) {
+ ApplicationConfig effectiveConfig = getEffective(appParam.getId());
+ if (Utils.isEmpty(appParam.getConfig())) {
if (effectiveConfig != null) {
- effectiveService.delete(application.getId(),
EffectiveTypeEnum.CONFIG);
+ effectiveService.delete(appParam.getId(), EffectiveTypeEnum.CONFIG);
}
} else {
// there was no configuration before, is a new configuration
@@ -115,48 +115,48 @@ public class ApplicationConfigServiceImpl
if (latestConfig != null) {
removeById(latestConfig.getId());
}
- this.create(application, latest);
+ this.create(appParam, latest);
} else {
- String decode = new
String(Base64.getDecoder().decode(application.getConfig()));
+ String decode = new
String(Base64.getDecoder().decode(appParam.getConfig()));
String encode = DeflaterUtils.zipString(decode.trim());
// need to diff the two configs are consistent
if (!effectiveConfig.getContent().equals(encode)) {
if (latestConfig != null) {
removeById(latestConfig.getId());
}
- this.create(application, latest);
+ this.create(appParam, latest);
}
}
}
} else {
// may be re-selected a config file (without config id), or may be based
on an original edit
// (with config Id).
- Long configId = application.getConfigId();
+ Long configId = appParam.getConfigId();
// an original edit
if (configId != null) {
ApplicationConfig config = this.getById(configId);
- String decode = new
String(Base64.getDecoder().decode(application.getConfig()));
+ String decode = new
String(Base64.getDecoder().decode(appParam.getConfig()));
String encode = DeflaterUtils.zipString(decode.trim());
// create...
if (!config.getContent().equals(encode)) {
if (latestConfig != null) {
removeById(latestConfig.getId());
}
- this.create(application, latest);
+ this.create(appParam, latest);
} else {
- this.setLatestOrEffective(latest, configId, application.getId());
+ this.setLatestOrEffective(latest, configId, appParam.getId());
}
} else {
- ApplicationConfig config = getEffective(application.getId());
+ ApplicationConfig config = getEffective(appParam.getId());
if (config != null) {
- String decode = new
String(Base64.getDecoder().decode(application.getConfig()));
+ String decode = new
String(Base64.getDecoder().decode(appParam.getConfig()));
String encode = DeflaterUtils.zipString(decode.trim());
// create...
if (!config.getContent().equals(encode)) {
- this.create(application, latest);
+ this.create(appParam, latest);
}
} else {
- this.create(application, latest);
+ this.create(appParam, latest);
}
}
}
@@ -211,14 +211,14 @@ public class ApplicationConfigServiceImpl
}
@Override
- public List<ApplicationConfig> history(Application application) {
+ public List<ApplicationConfig> history(Application appParam) {
LambdaQueryWrapper<ApplicationConfig> queryWrapper =
new LambdaQueryWrapper<ApplicationConfig>()
- .eq(ApplicationConfig::getAppId, application.getId())
+ .eq(ApplicationConfig::getAppId, appParam.getId())
.orderByDesc(ApplicationConfig::getVersion);
List<ApplicationConfig> configList =
this.baseMapper.selectList(queryWrapper);
- fillEffectiveField(application.getId(), configList);
+ fillEffectiveField(appParam.getId(), configList);
return configList;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 9092e7bae..60babca27 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -199,11 +199,11 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
}
@Override
- public Boolean delete(Long id, Application application) throws
InternalException {
+ public Boolean delete(Long id, Application appParam) throws
InternalException {
SavePoint savePoint = getById(id);
try {
if (StringUtils.isNotEmpty(savePoint.getPath())) {
- application.getFsOperator().delete(savePoint.getPath());
+ appParam.getFsOperator().delete(savePoint.getPath());
}
return removeById(id);
} catch (Exception e) {
@@ -221,17 +221,17 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
}
@Override
- public void removeApp(Application application) {
- Long appId = application.getId();
+ public void removeApp(Application appParam) {
+ Long appId = appParam.getId();
LambdaQueryWrapper<SavePoint> queryWrapper =
new LambdaQueryWrapper<SavePoint>().eq(SavePoint::getAppId, appId);
this.remove(queryWrapper);
try {
- application
+ appParam
.getFsOperator()
-
.delete(application.getWorkspace().APP_SAVEPOINTS().concat("/").concat(appId.toString()));
+
.delete(appParam.getWorkspace().APP_SAVEPOINTS().concat("/").concat(appId.toString()));
} catch (Exception e) {
log.error(e.getMessage(), e);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
index 8726415db..52e9de29a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
@@ -253,7 +253,7 @@ public class FlinkAppHttpWatcher {
? jobsOverview.getJobs().stream()
.filter(a -> StringUtils.equals(application.getJobId(),
a.getId()))
.findFirst()
- : jobsOverview.getJobs().stream().findFirst();
+ : Optional.empty();
} else {
optional =
jobsOverview.getJobs().stream()
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppLostWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppLostWatcher.java
index 30e196a3b..1ce68b735 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppLostWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppLostWatcher.java
@@ -98,7 +98,7 @@ public class FlinkAppLostWatcher {
.filter(application ->
FlinkAppStateEnum.isLost(application.getState()))
.collect(Collectors.toList());
updateState(probeApplication);
- probeApplication.stream().forEach(this::monitorApplication);
+ probeApplication.forEach(this::monitorApplication);
}
private void updateState(List<Application> applications) {
@@ -118,7 +118,7 @@ public class FlinkAppLostWatcher {
watch(probeApps);
} else {
List<AlertProbeMsg> alertProbeMsgs = generateProbeResults(probeApps);
- alertProbeMsgs.stream().forEach(this::alert);
+ alertProbeMsgs.forEach(this::alert);
reset(probeApps);
}
}
@@ -135,7 +135,8 @@ public class FlinkAppLostWatcher {
}
private void alert(AlertProbeMsg alertProbeMsg) {
- alertProbeMsg.getAlertId().stream()
+ alertProbeMsg
+ .getAlertId()
.forEach((alterId) -> alertService.alert(alterId,
AlertTemplate.of(alertProbeMsg)));
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/websocket/WebSocketEndpoint.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/websocket/WebSocketEndpoint.java
index 3872b9051..49f58dba7 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/websocket/WebSocketEndpoint.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/websocket/WebSocketEndpoint.java
@@ -34,6 +34,7 @@ import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
+@Getter
@Slf4j
@Component
@ServerEndpoint(value = "/websocket/{id}")
@@ -41,27 +42,29 @@ public class WebSocketEndpoint {
private static final Map<String, Session> SOCKET_SESSIONS = new
CopyOnWriteMap<>();
- @Getter private String id;
+ private String id;
- @Getter private Session session;
+ private Session session;
@OnOpen
public void onOpen(Session session, @PathParam("id") String id) {
- if (log.isDebugEnabled()) {
- log.debug("websocket onOpen....");
- }
+ log.debug("Websocket onOpen....");
this.id = id;
this.session = session;
SOCKET_SESSIONS.put(id, session);
}
@OnClose
- public void onClose() throws IOException {
- if (log.isDebugEnabled()) {
- log.debug("websocket onClose....");
+ public void onClose() {
+ if (SOCKET_SESSIONS.containsKey(this.id)) {
+ try (Session remove = SOCKET_SESSIONS.remove(this.id)) {
+ if (remove != null) {
+ log.debug("Websocket onClose id: {}", this.id);
+ }
+ } catch (IOException e) {
+ log.error("WebSocket onClose error: {}", e.getMessage(), e);
+ }
}
- this.session.close();
- SOCKET_SESSIONS.remove(this.id);
}
@OnError
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java
index b3c25aa0d..9b44eeed8 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java
@@ -114,7 +114,7 @@ public class JWTUtil {
.withExpiresAt(date)
.sign(algorithm);
} catch (Exception e) {
- log.error("error:{}", e);
+ log.error("error:{}", e.getMessage());
return null;
}
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java
index 210e92e38..c0ee6ee67 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java
@@ -39,6 +39,7 @@ import
org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
@@ -84,7 +85,7 @@ public class DorisSinkFunction<T> extends RichSinkFunction<T>
implements Checkpo
LOGGER.warn(
String.format(
" row data not fulfilled. {database: %s, table: %s, dataRows:
%s}",
- data.getDatabase(), data.getTable(), data.getDataRows()));
+ data.getDatabase(), data.getTable(),
Arrays.toString(data.getDataRows())));
return;
}
dorisSinkWriter.writeRecords(data.getDatabase(), data.getTable(),
data.getDataRows());