This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 24b0e540f [improve] code polish : log, logic, exception etc (#3806)
24b0e540f is described below
commit 24b0e540fe9687d384973b189513852af7e9431c
Author: gongzhongqiang <[email protected]>
AuthorDate: Fri Jun 28 15:24:07 2024 +0800
[improve] code polish : log, logic, exception etc (#3806)
* [improve] code polish : log, logic, exception etc
---------
Co-authored-by: benjobs <[email protected]>
---
.../console/base/domain/RestResponse.java | 4 +-
.../ApplicationBuildPipelineController.java | 10 +--
.../core/controller/ApplicationController.java | 21 ++----
.../core/controller/FlinkEnvController.java | 17 ++---
.../core/controller/FlinkSqlController.java | 2 +-
.../console/core/controller/SettingController.java | 11 ++--
.../console/core/service/FlinkEnvService.java | 6 +-
.../impl/ApplicationActionServiceImpl.java | 12 ++--
.../impl/ApplicationInfoServiceImpl.java | 75 ++++++----------------
.../core/service/impl/FlinkEnvServiceImpl.java | 5 +-
.../core/service/impl/FlinkSqlServiceImpl.java | 32 ++++-----
.../core/service/impl/ProjectServiceImpl.java | 18 ++----
.../core/service/impl/ResourceServiceImpl.java | 16 ++---
.../core/service/impl/SqlCompleteServiceImpl.java | 1 -
.../core/service/impl/VariableServiceImpl.java | 10 +--
.../src/views/flink/cluster/Add.vue | 2 +-
.../streampark/gateway/factories/FactoryUtil.java | 5 +-
.../streampark/spark/core/util/ParameterTool.java | 1 -
18 files changed, 87 insertions(+), 161 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestResponse.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestResponse.java
index e016c9aae..cfd91cda0 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestResponse.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestResponse.java
@@ -35,9 +35,7 @@ public class RestResponse extends HashMap<String, Object> {
private static final long serialVersionUID = 1L;
public static RestResponse success(Object data) {
- RestResponse resp = new RestResponse();
- resp.put(STATUS_KEY, STATUS_SUCCESS);
- resp.put(CODE_KEY, ResponseCode.CODE_SUCCESS);
+ RestResponse resp = success();
resp.put(DATA_KEY, data);
return resp;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
index 9fb8453c4..d0c89fcf2 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
@@ -49,13 +49,9 @@ public class ApplicationBuildPipelineController {
@PermissionScope(app = "#appId")
@PostMapping(value = "build")
@RequiresPermissions("app:create")
- public RestResponse buildApplication(Long appId, boolean forceBuild) {
- try {
- boolean actionResult = appBuildPipeService.buildApplication(appId,
forceBuild);
- return RestResponse.success(actionResult);
- } catch (Exception e) {
- return RestResponse.success(false).message(e.getMessage());
- }
+ public RestResponse buildApplication(Long appId, boolean forceBuild) throws
Exception {
+ boolean actionResult = appBuildPipeService.buildApplication(appId,
forceBuild);
+ return RestResponse.success(actionResult);
}
/**
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
index dbe3ebf97..350d1d128 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
@@ -197,13 +197,9 @@ public class ApplicationController {
@PermissionScope(app = "#app.id", team = "#app.teamId")
@PostMapping(value = "start")
@RequiresPermissions("app:start")
- public RestResponse start(@Parameter(hidden = true) Application app) {
- try {
- applicationActionService.start(app, false);
- return RestResponse.success(true);
- } catch (Exception e) {
- return RestResponse.success(false).message(e.getMessage());
- }
+ public RestResponse start(@Parameter(hidden = true) Application app) throws
Exception {
+ applicationActionService.start(app, false);
+ return RestResponse.success(true);
}
@Operation(
@@ -336,14 +332,9 @@ public class ApplicationController {
}
@PostMapping("checkjar")
- public RestResponse checkjar(String jar) {
- File file = new File(jar);
- try {
- Utils.requireCheckJarFile(file.toURI().toURL());
- return RestResponse.success(true);
- } catch (IOException e) {
- return RestResponse.success(file).message(e.getLocalizedMessage());
- }
+ public RestResponse checkjar(String jar) throws IOException {
+ Utils.requireCheckJarFile(new File(jar).toURI().toURL());
+ return RestResponse.success(true);
}
@PostMapping("upload")
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkEnvController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkEnvController.java
index e7c0e51b5..09dcaca4c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkEnvController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkEnvController.java
@@ -18,7 +18,6 @@
package org.apache.streampark.console.core.controller;
import org.apache.streampark.console.base.domain.RestResponse;
-import org.apache.streampark.console.base.exception.ApiDetailException;
import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.enums.FlinkEnvCheckEnum;
import org.apache.streampark.console.core.service.FlinkEnvService;
@@ -53,12 +52,8 @@ public class FlinkEnvController {
}
@PostMapping("create")
- public RestResponse create(FlinkEnv version) {
- try {
- flinkEnvService.create(version);
- } catch (Exception e) {
- throw new ApiDetailException(e);
- }
+ public RestResponse create(FlinkEnv version) throws Exception {
+ flinkEnvService.create(version);
return RestResponse.success(true);
}
@@ -76,12 +71,8 @@ public class FlinkEnvController {
}
@PostMapping("update")
- public RestResponse update(FlinkEnv version) throws Exception {
- try {
- flinkEnvService.update(version);
- } catch (Exception e) {
- throw new ApiDetailException(e);
- }
+ public RestResponse update(FlinkEnv version) {
+ flinkEnvService.update(version);
return RestResponse.success(true);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
index b78e5084b..12dc7df2c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
@@ -117,7 +117,7 @@ public class FlinkSqlController {
@PostMapping("history")
@PermissionScope(app = "#app.id", team = "#app.teamId")
- public RestResponse sqlhistory(Application app) {
+ public RestResponse history(Application app) {
List<FlinkSql> sqlList = flinkSqlService.listFlinkSqlHistory(app.getId());
return RestResponse.success(sqlList);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SettingController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SettingController.java
index 2dfc876f7..988f46d62 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SettingController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SettingController.java
@@ -35,6 +35,7 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
+import java.io.IOException;
import java.util.List;
@Slf4j
@@ -110,12 +111,8 @@ public class SettingController {
}
@PostMapping("check/hadoop")
- public RestResponse checkHadoop() {
- try {
- HadoopUtils.hdfs().getStatus();
- return RestResponse.success(true);
- } catch (Exception e) {
- return RestResponse.success(false).message(e.getMessage());
- }
+ public RestResponse checkHadoop() throws IOException {
+ HadoopUtils.hdfs().getStatus();
+ return RestResponse.success(true);
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
index 0c20689ff..2f5e28931 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
@@ -38,10 +38,9 @@ public interface FlinkEnvService extends IService<FlinkEnv> {
* Create a new instance.
*
* @param version The version of FlinkEnv to use.
- * @throws Exception if an error occurs during the creation process.
* @return true if the instance is successfully created, false otherwise.
*/
- boolean create(FlinkEnv version) throws Exception;
+ boolean create(FlinkEnv version);
/**
* Deletes a Flink environment with the provided ID.
@@ -54,9 +53,8 @@ public interface FlinkEnvService extends IService<FlinkEnv> {
* Updates the specified version of Flink environment.
*
* @param version the version of Flink environment to update
- * @throws IOException if an I/O error occurs during the update process
*/
- void update(FlinkEnv version) throws IOException;
+ void update(FlinkEnv version);
/**
* Get flink version by application id.
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 2266310b1..0ce35b2e1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -574,8 +574,8 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
/**
* Check whether a job with the same name is running in the yarn queue
*
- * @param jobName
- * @return
+ * @param jobName job name
+ * @return true if the job is running, false otherwise
*/
private boolean checkAppRepeatInYarn(String jobName) {
try {
@@ -749,12 +749,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
}
if
(FlinkExecutionMode.isKubernetesApplicationMode(application.getExecutionMode()))
{
- try {
- HadoopUtils.yarnClient();
- properties.put(JobManagerOptions.ARCHIVE_DIR.key(),
Workspace.ARCHIVES_FILE_PATH());
- } catch (Exception e) {
- // skip
- }
+ properties.put(JobManagerOptions.ARCHIVE_DIR.key(),
Workspace.ARCHIVES_FILE_PATH());
}
if (application.getAllowNonRestored()) {
@@ -797,6 +792,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
yarnClient.killApplication(applications.get(0).getApplicationId());
}
} catch (Exception exception) {
+ log.error("Kill yarn application failed.", exception);
}
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index 14c708f4f..504b69115 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -443,59 +443,34 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
@Override
public AppExistsStateEnum checkExists(Application appParam) {
- if (!checkJobName(appParam.getJobName())) {
+ String jobName = appParam.getJobName();
+ Long appParamId = appParam.getId();
+
+ if (StringUtils.isBlank(jobName)
+ || !JOB_NAME_PATTERN.matcher(jobName.trim()).matches()
+ || !SINGLE_SPACE_PATTERN.matcher(jobName.trim()).matches()) {
return AppExistsStateEnum.INVALID;
}
- boolean existsByJobName = this.existsByJobName(appParam.getJobName());
-
- if (appParam.getId() != null) {
- Application app = getById(appParam.getId());
- if (app.getJobName().equals(appParam.getJobName())) {
- return AppExistsStateEnum.NO;
- }
-
- if (existsByJobName) {
- return AppExistsStateEnum.IN_DB;
- }
+ Application application =
+ baseMapper.selectOne(
+ new LambdaQueryWrapper<Application>().eq(Application::getJobName,
jobName));
+ if (application != null && !application.getId().equals(appParamId)) {
+ return AppExistsStateEnum.IN_DB;
+ }
- // has stopped status
- if (FlinkAppStateEnum.isEndState(app.getState())) {
- // check whether jobName exists on yarn
- if (FlinkExecutionMode.isYarnMode(appParam.getExecutionMode())
- && YarnUtils.isContains(appParam.getJobName())) {
- return AppExistsStateEnum.IN_YARN;
- }
- // check whether clusterId, namespace, jobId on kubernetes
- if (appParam.isKubernetesModeJob()
- && k8SFlinkTrackMonitor.checkIsInRemoteCluster(
- flinkK8sWatcherWrapper.toTrackId(appParam))) {
- return AppExistsStateEnum.IN_KUBERNETES;
- }
- }
- } else {
- if (existsByJobName) {
- return AppExistsStateEnum.IN_DB;
- }
+ if (FlinkExecutionMode.isYarnMode(appParam.getExecutionMode())
+ && YarnUtils.isContains(jobName)) {
+ return AppExistsStateEnum.IN_YARN;
+ }
- // check whether jobName exists on yarn
- if (FlinkExecutionMode.isYarnMode(appParam.getExecutionMode())
- && YarnUtils.isContains(appParam.getJobName())) {
- return AppExistsStateEnum.IN_YARN;
- }
- // check whether clusterId, namespace, jobId on kubernetes
- if (appParam.isKubernetesModeJob()
- && k8SFlinkTrackMonitor.checkIsInRemoteCluster(
- flinkK8sWatcherWrapper.toTrackId(appParam))) {
- return AppExistsStateEnum.IN_KUBERNETES;
- }
+ if (appParam.isKubernetesModeJob()
+ && k8SFlinkTrackMonitor.checkIsInRemoteCluster(
+ flinkK8sWatcherWrapper.toTrackId(appParam))) {
+ return AppExistsStateEnum.IN_KUBERNETES;
}
- return AppExistsStateEnum.NO;
- }
- private boolean existsByJobName(String jobName) {
- return baseMapper.exists(
- new LambdaQueryWrapper<Application>().eq(Application::getJobName,
jobName));
+ return AppExistsStateEnum.NO;
}
@Override
@@ -553,12 +528,4 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
return "When custom savepoint is not set, state.savepoints.dir needs to
be set in properties or flink-conf.yaml of application";
}
}
-
- private Boolean checkJobName(String jobName) {
- if (!StringUtils.isBlank(jobName.trim())) {
- return JOB_NAME_PATTERN.matcher(jobName).matches()
- && SINGLE_SPACE_PATTERN.matcher(jobName).matches();
- }
- return false;
- }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
index 17f41f5a5..6a1d871b9 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
@@ -34,7 +34,6 @@ import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.io.File;
-import java.io.IOException;
import java.util.Date;
@Slf4j
@@ -84,7 +83,7 @@ public class FlinkEnvServiceImpl extends
ServiceImpl<FlinkEnvMapper, FlinkEnv>
}
@Override
- public boolean create(FlinkEnv version) throws Exception {
+ public boolean create(FlinkEnv version) {
long count = this.baseMapper.selectCount(null);
version.setIsDefault(count == 0);
version.setCreateTime(new Date());
@@ -106,7 +105,7 @@ public class FlinkEnvServiceImpl extends
ServiceImpl<FlinkEnvMapper, FlinkEnv>
}
@Override
- public void update(FlinkEnv version) throws IOException {
+ public void update(FlinkEnv version) {
FlinkEnv flinkEnv = getById(version.getId());
checkOrElseAlert(flinkEnv);
flinkEnv.setDescription(version.getDescription());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
index 3599ed8d0..dcc602fc1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
@@ -48,6 +48,7 @@ import
org.springframework.transaction.annotation.Transactional;
import java.lang.reflect.Method;
import java.util.List;
+import java.util.Optional;
@Slf4j
@Service
@@ -83,14 +84,17 @@ public class FlinkSqlServiceImpl extends
ServiceImpl<FlinkSqlMapper, FlinkSql>
.orderByDesc(FlinkSql::getVersion);
Page<FlinkSql> flinkSqlPage = baseMapper.selectPage(page, queryWrapper);
- if (!flinkSqlPage.getRecords().isEmpty()) {
- FlinkSql flinkSql = flinkSqlPage.getRecords().get(0);
- if (decode) {
- flinkSql.setSql(DeflaterUtils.unzipString(flinkSql.getSql()));
- }
- return flinkSql;
- }
- return null;
+ return Optional.ofNullable(flinkSqlPage.getRecords())
+ .filter(records -> !records.isEmpty())
+ .map(records -> records.get(0))
+ .map(
+ flinkSql -> {
+ if (decode) {
+ flinkSql.setSql(DeflaterUtils.unzipString(flinkSql.getSql()));
+ }
+ return flinkSql;
+ })
+ .orElse(null);
}
@Override
@@ -125,13 +129,11 @@ public class FlinkSqlServiceImpl extends
ServiceImpl<FlinkSqlMapper, FlinkSql>
List<FlinkSql> sqlList = this.baseMapper.selectList(queryWrapper);
FlinkSql effective = getEffective(appId, false);
- if (effective != null && !sqlList.isEmpty()) {
- for (FlinkSql sql : sqlList) {
- if (sql.getId().equals(effective.getId())) {
- sql.setEffective(true);
- break;
- }
- }
+ if (effective != null) {
+ sqlList.stream()
+ .filter(sql -> sql.getId().equals(effective.getId()))
+ .findFirst()
+ .ifPresent(sql -> sql.setEffective(true));
}
return sqlList;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index aebf67033..78b22424e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -288,16 +288,13 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
@Override
public List<String> listJars(Project project) {
- List<String> jarList = new ArrayList<>(0);
ApiAlertException.throwIfNull(
project.getModule(), "Project module can't be null, please check.");
- File apps = new File(project.getDistHome(), project.getModule());
- for (File file : Objects.requireNonNull(apps.listFiles())) {
- if (file.getName().endsWith(Constant.JAR_SUFFIX)) {
- jarList.add(file.getName());
- }
- }
- return jarList;
+ File projectModuleDir = new File(project.getDistHome(),
project.getModule());
+ return Arrays.stream(Objects.requireNonNull(projectModuleDir.listFiles()))
+ .map(File::getName)
+ .filter(name -> name.endsWith(Constant.JAR_SUFFIX))
+ .collect(Collectors.toList());
}
@Override
@@ -324,11 +321,10 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
return false;
}
}
- LambdaQueryWrapper<Project> queryWrapper =
+ return this.baseMapper.exists(
new LambdaQueryWrapper<Project>()
.eq(Project::getName, project.getName())
- .eq(Project::getTeamId, project.getTeamId());
- return this.baseMapper.selectCount(queryWrapper) > 0;
+ .eq(Project::getTeamId, project.getTeamId()));
}
@Override
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index c8a23df7b..886af7b77 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -391,10 +391,9 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
connectorResource.setFactoryIdentifier(factory.factoryIdentifier());
} catch (Exception e) {
log.error(
- "Failed to set class name or factory identifier for connector
resource. Class name: "
- + factoryClassName
- + ", Factory identifier: "
- + factory.factoryIdentifier(),
+ "Failed to set class name or factory identifier for connector
resource. Class name: {}, Factory identifier: {}",
+ factoryClassName,
+ factory.factoryIdentifier(),
e);
}
@@ -405,7 +404,7 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
.forEach(x -> requiredOptions.put(x.key(),
getOptionDefaultValue(x)));
connectorResource.setRequiredOptions(requiredOptions);
} catch (Exception e) {
- log.error("Failed to set required options for connector resource.
" + e);
+ log.error("Failed to set required options for connector
resource.", e);
}
try {
@@ -415,14 +414,14 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
.forEach(x -> optionalOptions.put(x.key(),
getOptionDefaultValue(x)));
connectorResource.setOptionalOptions(optionalOptions);
} catch (Exception e) {
- log.error("Fail to set optional options for connector resource. "
+ e);
+ log.error("Fail to set optional options for connector resource.",
e);
}
return connectorResource;
}
}
return null;
} catch (Exception e) {
- log.error("getConnectorResource failed. " + e);
+ log.error("getConnectorResource failed.", e);
}
return null;
}
@@ -467,8 +466,7 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
ApiAlertException.throwIfNull(resource, "The resource does not exist.");
ApiAlertException.throwIfTrue(
- isDependByApplications(resource),
- "Sorry, the resource is still in use, cannot be removed.");
+ isDependByApplications(resource), "The resource is still in use,
cannot be removed.");
}
private boolean isDependByApplications(Resource resource) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
index 080df5599..6445a331d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
@@ -206,7 +206,6 @@ public class SqlCompleteServiceImpl implements
SqlCompleteService {
Character nowChar = word.charAt(loc);
if (!nowStep.containsKey(nowChar)) {
// maybe wrong typing
- breakLoc.loc = loc;
break;
}
nowStep = nowStep.get(nowChar).getNext();
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java
index a5c3662b7..ca290f266 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java
@@ -76,7 +76,7 @@ public class VariableServiceImpl extends
ServiceImpl<VariableMapper, Variable>
ApiAlertException.throwIfTrue(
this.findByVariableCode(variable.getTeamId(),
variable.getVariableCode()) != null,
- "Sorry, the variable code already exists.");
+ "The variable code already exists.");
variable.setCreatorId(serviceHelper.getUserId());
this.save(variable);
@@ -85,7 +85,7 @@ public class VariableServiceImpl extends
ServiceImpl<VariableMapper, Variable>
@Override
public void remove(Variable variable) {
ApiAlertException.throwIfTrue(
- isDependByApplications(variable), "Sorry, the variable is actually
used.");
+ isDependByApplications(variable), "The variable is actually used.");
this.removeById(variable);
}
@@ -119,12 +119,12 @@ public class VariableServiceImpl extends
ServiceImpl<VariableMapper, Variable>
@Override
public void updateVariable(Variable variable) {
// region update variable
- ApiAlertException.throwIfNull(variable.getId(), "Sorry, the variable id
cannot be null.");
+ ApiAlertException.throwIfNull(variable.getId(), "The variable id cannot be
null.");
Variable findVariable = this.baseMapper.selectById(variable.getId());
- ApiAlertException.throwIfNull(findVariable, "Sorry, the variable does not
exist.");
+ ApiAlertException.throwIfNull(findVariable, "The variable does not
exist.");
ApiAlertException.throwIfFalse(
findVariable.getVariableCode().equals(variable.getVariableCode()),
- "Sorry, the variable code cannot be updated.");
+ "The variable code cannot be updated.");
this.baseMapper.updateById(variable);
// endregion
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/cluster/Add.vue
b/streampark-console/streampark-console-webapp/src/views/flink/cluster/Add.vue
index 5966277f2..a17cc09f3 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/cluster/Add.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/cluster/Add.vue
@@ -53,7 +53,7 @@
const status = parseInt(res.status);
if (status === 0) {
const resp = await fetchCreateCluster(params);
- if (resp) {
+ if (resp.data.code == 200) {
Swal.fire({
icon: 'success',
title: values.clusterName.concat(
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/FactoryUtil.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/FactoryUtil.java
index 6a34fa492..1878d6d30 100644
---
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/FactoryUtil.java
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/FactoryUtil.java
@@ -100,9 +100,8 @@ public class FactoryUtil {
if (loadResult.hasFailed()) {
if (loadResult.getError() instanceof NoClassDefFoundError) {
LOG.debug(
- "NoClassDefFoundError when loading a "
- + Factory.class
- + ". This is expected when trying to load a format
dependency but load failed.",
+ "NoClassDefFoundError when loading a {}. This is
expected when trying to load a format dependency but load failed.",
+ Factory.class,
loadResult.getError());
// After logging, we just ignore this failure
return;
diff --git
a/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/util/ParameterTool.java
b/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/util/ParameterTool.java
index 0a2bf27ab..f8e79ef89 100644
---
a/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/util/ParameterTool.java
+++
b/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/util/ParameterTool.java
@@ -23,7 +23,6 @@ import java.util.*;
public class ParameterTool {
- private static final long serialVersionUID = 15345346345L;
protected static final String NO_VALUE_KEY = "__NO_VALUE_KEY";