This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 6b00a0cee [Refactor] Remove LogClientService and minor code refactor
(#2979)
6b00a0cee is described below
commit 6b00a0cee96a879f4ccff4dc0168b8f4f004619b
Author: gongzhongqiang <[email protected]>
AuthorDate: Sat Aug 26 15:49:34 2023 +0800
[Refactor] Remove LogClientService and minor code refactor (#2979)
* [Refactor] Remove LogClientService and minor code refactor
---
README.md | 2 +-
.../streampark/console/base/util/FileUtils.java | 51 +++++++++++---
.../console/core/mapper/ApplicationMapper.java | 12 +---
.../console/core/service/ApplicationService.java | 10 +--
.../console/core/service/LogClientService.java | 52 ---------------
.../core/service/impl/ApplicationServiceImpl.java | 77 +++++++++++-----------
.../core/service/impl/FlinkClusterServiceImpl.java | 4 +-
.../core/service/impl/FlinkEnvServiceImpl.java | 2 +-
.../core/service/impl/SettingServiceImpl.java | 22 +++----
.../console/core/task/FlinkClusterWatcher.java | 4 +-
.../resources/mapper/core/ApplicationMapper.xml | 50 +-------------
11 files changed, 102 insertions(+), 184 deletions(-)
diff --git a/README.md b/README.md
index 1ddf15e6c..13f66b271 100644
--- a/README.md
+++ b/README.md
@@ -53,7 +53,7 @@
* Apache Flink & Spark stream processing application development framework
* Out-of-the-box connectors
* Support multiple versions of Flink & Spark
-* Scala 2.11 / 2.12 support
+* Scala 2.12 support
* One-stop stream processing management platform
* Support catalog、olap、streaming-warehouse etc.
* ...
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java
index 7bca56d27..4aaa624f2 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java
@@ -17,9 +17,15 @@
package org.apache.streampark.console.base.util;
+import org.apache.streampark.console.base.exception.ApiDetailException;
+
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
/** The file utils. */
public class FileUtils {
@@ -27,12 +33,12 @@ public class FileUtils {
private FileUtils() {}
/**
- * Read the end of the file.
+ * Reads the last portion of a file as a byte array.
*
- * @param file The file
- * @param maxSize Maximum size of read file
- * @return The file content
- * @throws IOException
+ * @param file the file to read
+ * @param maxSize the maximum number of bytes to read from the end of the
file
+ * @return the byte array containing the content read from the file
+ * @throws IOException if an I/O error occurs
*/
public static byte[] readEndOfFile(File file, long maxSize) throws
IOException {
long readSize = maxSize;
@@ -50,12 +56,14 @@ public class FileUtils {
}
/**
- * Read the end of the file.
+ * Read the content of a file from a specified offset.
*
- * @param file The file
- * @param maxSize Maximum size of read file
- * @return The file content
- * @throws IOException
+ * @param file The file to read from
+ * @param startOffset The offset from where to start reading the file
+ * @param maxSize The maximum size of the file to read
+ * @return The content of the file as a byte array
+ * @throws IOException if an I/O error occurs while reading the file
+ * @throws IllegalArgumentException if the startOffset is greater than the
file length
*/
public static byte[] readFileFromOffset(File file, long startOffset, long
maxSize)
throws IOException {
@@ -73,4 +81,27 @@ public class FileUtils {
}
return fileContent;
}
+
+ /**
+ * Roll View Log.
+ *
+ * @param path The file path.
+ * @param offset The offset.
+ * @param limit The limit.
+ * @return The content of the file.
+ * @throws ApiDetailException if there's an error rolling the view log.
+ */
+ public static String rollViewLog(String path, int offset, int limit) {
+ try {
+ File file = new File(path);
+ if (file.exists() && file.isFile()) {
+ try (Stream<String> stream = Files.lines(Paths.get(path))) {
+ return
stream.skip(offset).limit(limit).collect(Collectors.joining("\r\n"));
+ }
+ }
+ return null;
+ } catch (Exception e) {
+ throw new ApiDetailException("roll view log error: " + e);
+ }
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index 033f7f106..3744efa84 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -54,20 +54,10 @@ public interface ApplicationMapper extends
BaseMapper<Application> {
void resetOptionState();
- Boolean existsByTeamId(@Param("teamId") Long teamId);
-
- Boolean existsByJobName(@Param("jobName") String jobName);
-
- Boolean existsByUserId(@Param("userId") Long userId);
-
List<Application> getByProjectId(@Param("projectId") Long id);
boolean existsRunningJobByClusterId(@Param("clusterId") Long clusterId);
- boolean existsJobByClusterId(@Param("clusterId") Long clusterId);
-
- Integer countJobsByClusterId(@Param("clusterId") Long clusterId);
-
- Integer countAffectedJobsByClusterId(
+ Integer countAffectedByClusterId(
@Param("clusterId") Long clusterId, @Param("dbType") String dbType);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index 97f9b0bcc..a9c3ddbf1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -98,15 +98,15 @@ public interface ApplicationService extends
IService<Application> {
void forcedStop(Application app);
- boolean existsRunningJobByClusterId(Long clusterId);
+ boolean existsRunningByClusterId(Long clusterId);
- boolean existsJobByClusterId(Long clusterId);
+ boolean existsByClusterId(Long clusterId);
- Integer countJobsByClusterId(Long clusterId);
+ Integer countByClusterId(Long clusterId);
- Integer countAffectedJobsByClusterId(Long clusterId, String dbType);
+ Integer countAffectedByClusterId(Long clusterId, String dbType);
- boolean existsJobByFlinkEnvId(Long flinkEnvId);
+ boolean existsByFlinkEnvId(Long flinkEnvId);
List<String> getRecentK8sNamespace();
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LogClientService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LogClientService.java
deleted file mode 100644
index e60a5115c..000000000
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LogClientService.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.console.core.service;
-
-import org.apache.streampark.console.base.exception.ApiDetailException;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-import java.io.File;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/** log client */
-@Slf4j
-@Component
-public class LogClientService {
- public String rollViewLog(String path, int offset, int limit) {
- try {
- File file = new File(path);
- if (file.exists() && file.isFile()) {
- try (Stream<String> stream = Files.lines(Paths.get(path))) {
- List<String> lines =
stream.skip(offset).limit(limit).collect(Collectors.toList());
- StringBuilder builder = new StringBuilder();
- lines.forEach(line -> builder.append(line).append("\r\n"));
- return builder.toString();
- }
- }
- return null;
- } catch (Exception e) {
- throw new ApiDetailException("roll view log error: " + e);
- }
- }
-}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index ded9b40a8..a94879640 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -74,7 +74,6 @@ import
org.apache.streampark.console.core.service.EffectiveService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.FlinkSqlService;
-import org.apache.streampark.console.core.service.LogClientService;
import org.apache.streampark.console.core.service.ProjectService;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.console.core.service.SavePointService;
@@ -214,8 +213,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
@Autowired private VariableService variableService;
- @Autowired private LogClientService logClient;
-
@Autowired private YarnQueueService yarnQueueService;
@Autowired private FlinkClusterWatcher flinkClusterWatcher;
@@ -504,49 +501,49 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
@Override
public boolean existsByTeamId(Long teamId) {
- return baseMapper.existsByTeamId(teamId);
+ return baseMapper.exists(
+ new LambdaQueryWrapper<Application>().eq(Application::getTeamId,
teamId));
}
@Override
public boolean existsByUserId(Long userId) {
- return baseMapper.existsByUserId(userId);
+ return baseMapper.exists(
+ new LambdaQueryWrapper<Application>().eq(Application::getUserId,
userId));
}
@Override
- public boolean existsRunningJobByClusterId(Long clusterId) {
- boolean exists = baseMapper.existsRunningJobByClusterId(clusterId);
- if (exists) {
- return true;
- }
- for (Application application : FlinkHttpWatcher.getWatchingApps()) {
- if (clusterId.equals(application.getFlinkClusterId())
- && FlinkAppState.RUNNING.equals(application.getFlinkAppStateEnum()))
{
- return true;
- }
- }
- return false;
+ public boolean existsRunningByClusterId(Long clusterId) {
+ return baseMapper.existsRunningJobByClusterId(clusterId)
+ || FlinkHttpWatcher.getWatchingApps().stream()
+ .anyMatch(
+ application ->
+ clusterId.equals(application.getFlinkClusterId())
+ &&
FlinkAppState.RUNNING.equals(application.getFlinkAppStateEnum()));
}
@Override
- public boolean existsJobByClusterId(Long clusterId) {
- return baseMapper.existsJobByClusterId(clusterId);
+ public boolean existsByClusterId(Long clusterId) {
+ return baseMapper.exists(
+ new
LambdaQueryWrapper<Application>().eq(Application::getFlinkClusterId,
clusterId));
}
@Override
- public Integer countJobsByClusterId(Long clusterId) {
- return baseMapper.countJobsByClusterId(clusterId);
+ public Integer countByClusterId(Long clusterId) {
+ return baseMapper
+ .selectCount(
+ new
LambdaQueryWrapper<Application>().eq(Application::getFlinkClusterId, clusterId))
+ .intValue();
}
@Override
- public Integer countAffectedJobsByClusterId(Long clusterId, String dbType) {
- return baseMapper.countAffectedJobsByClusterId(clusterId, dbType);
+ public Integer countAffectedByClusterId(Long clusterId, String dbType) {
+ return baseMapper.countAffectedByClusterId(clusterId, dbType);
}
@Override
- public boolean existsJobByFlinkEnvId(Long flinkEnvId) {
- LambdaQueryWrapper<Application> lambdaQueryWrapper =
- new LambdaQueryWrapper<Application>().eq(Application::getVersionId,
flinkEnvId);
- return getBaseMapper().exists(lambdaQueryWrapper);
+ public boolean existsByFlinkEnvId(Long flinkEnvId) {
+ return baseMapper.exists(
+ new LambdaQueryWrapper<Application>().eq(Application::getVersionId,
flinkEnvId));
}
@Override
@@ -623,7 +620,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
future.cancel(true);
}
if (path != null) {
- return logClient.rollViewLog(path, offset, limit);
+ return
org.apache.streampark.console.base.util.FileUtils.rollViewLog(
+ path, offset, limit);
}
return null;
})
@@ -652,7 +650,13 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
return ParameterCli.read(args);
}
- /** Check if the current jobName and other key identifiers already exist in
db and yarn/k8s */
+ /**
+ * Check if the current jobName and other key identifiers already exist in
the database and
+ * yarn/k8s.
+ *
+ * @param appParam The application to check for existence.
+ * @return The state of the application's existence.
+ */
@Override
public AppExistsState checkExists(Application appParam) {
@@ -672,14 +676,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
return AppExistsState.IN_DB;
}
- FlinkAppState state = FlinkAppState.of(app.getState());
// has stopped status
- if (state.equals(FlinkAppState.ADDED)
- || state.equals(FlinkAppState.CREATED)
- || state.equals(FlinkAppState.FAILED)
- || state.equals(FlinkAppState.CANCELED)
- || state.equals(FlinkAppState.LOST)
- || state.equals(FlinkAppState.KILLED)) {
+ if (FlinkAppState.isEndState(app.getState())) {
// check whether jobName exists on yarn
if (ExecutionMode.isYarnMode(appParam.getExecutionMode())
&& YarnUtils.isContains(appParam.getJobName())) {
@@ -758,7 +756,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
private boolean existsByJobName(String jobName) {
- return this.baseMapper.existsByJobName(jobName);
+ return baseMapper.exists(
+ new LambdaQueryWrapper<Application>().eq(Application::getJobName,
jobName));
}
@SuppressWarnings("checkstyle:WhitespaceAround")
@@ -879,7 +878,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
}
- // 2) k8s podTemplate changed..
+ // 2) k8s podTemplate changed.
if (application.getBuild() &&
ExecutionMode.isKubernetesMode(appParam.getExecutionMode())) {
if (ObjectUtils.trimNoEquals(
application.getK8sRestExposedType(),
appParam.getK8sRestExposedType())
@@ -1386,7 +1385,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
"This state.savepoints.dir value "
+ savepointPath
+ " path part to store the checkpoint data in is null. Please
specify a directory path for the checkpoint data.";
- } else if (pathPart.length() == 0 || "/".equals(pathPart)) {
+ } else if (pathPart.isEmpty() || "/".equals(pathPart)) {
error =
"This state.savepoints.dir value "
+ savepointPath
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index f5780124f..3f6f3dd55 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -255,7 +255,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
checkActiveIfNeeded(flinkCluster);
// 3) check job if running on cluster
- boolean existsRunningJob =
applicationService.existsRunningJobByClusterId(flinkCluster.getId());
+ boolean existsRunningJob =
applicationService.existsRunningByClusterId(flinkCluster.getId());
ApiAlertException.throwIfTrue(
existsRunningJob, "Some app is running on this cluster, the cluster
cannot be shutdown");
@@ -330,7 +330,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
}
ApiAlertException.throwIfTrue(
- applicationService.existsJobByClusterId(id),
+ applicationService.existsByClusterId(id),
"Some app on this cluster, the cluster cannot be delete, please
check.");
removeById(id);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
index 2e638c7d3..bdba7300c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
@@ -164,7 +164,7 @@ public class FlinkEnvServiceImpl extends
ServiceImpl<FlinkEnvMapper, FlinkEnv>
// 3.check if it is being used by any application
ApiAlertException.throwIfTrue(
- applicationService.existsJobByFlinkEnvId(flinkEnv.getId()),
+ applicationService.existsByFlinkEnvId(flinkEnv.getId()),
"The flink home is still in use by some application, please check.");
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SettingServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SettingServiceImpl.java
index a2c47f3e4..bba0b91e3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SettingServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SettingServiceImpl.java
@@ -28,20 +28,26 @@ import org.apache.commons.lang3.StringUtils;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.context.ApplicationListener;
-import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
-import java.util.List;
+import javax.annotation.PostConstruct;
+
import java.util.Optional;
@Slf4j
@Service
@Transactional(propagation = Propagation.SUPPORTS, readOnly = true,
rollbackFor = Exception.class)
public class SettingServiceImpl extends ServiceImpl<SettingMapper, Setting>
- implements SettingService, ApplicationListener<ContextRefreshedEvent> {
+ implements SettingService {
+
+ private final Setting emptySetting = new Setting();
+
+ @PostConstruct
+ public void loadSettings() {
+ list().forEach(x -> SETTINGS.put(x.getSettingKey(), x));
+ }
@Override
public Setting get(String key) {
@@ -50,14 +56,6 @@ public class SettingServiceImpl extends
ServiceImpl<SettingMapper, Setting>
return this.getOne(queryWrapper);
}
- private final Setting emptySetting = new Setting();
-
- @Override
- public void onApplicationEvent(ContextRefreshedEvent event) {
- List<Setting> settingList = super.list();
- settingList.forEach(x -> SETTINGS.put(x.getSettingKey(), x));
- }
-
@Override
public boolean update(Setting setting) {
try {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index 936a38111..090e75e75 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -132,9 +132,9 @@ public class FlinkClusterWatcher {
private void alert(FlinkCluster cluster, ClusterState state) {
if (cluster.getAlertId() != null) {
-
cluster.setAllJobs(applicationService.countJobsByClusterId(cluster.getId()));
+ cluster.setAllJobs(applicationService.countByClusterId(cluster.getId()));
cluster.setAffectedJobs(
- applicationService.countAffectedJobsByClusterId(
+ applicationService.countAffectedByClusterId(
cluster.getId(),
InternalConfigHolder.get(CommonConfig.SPRING_PROFILES_ACTIVE())));
cluster.setClusterState(state.getValue());
cluster.setEndTime(new Date());
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index d8fa3f7dc..1e4cdb585 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -82,46 +82,6 @@
set option_state = 0
</update>
- <select id="existsByTeamId" resultType="java.lang.Boolean"
parameterType="java.lang.Long">
- select
- CASE
- WHEN count(1) > 0 THEN true ELSE false
- END
- from t_flink_app
- where team_id = #{teamId}
- limit 1
- </select>
-
- <select id="existsByJobName" resultType="java.lang.Boolean"
parameterType="java.lang.String">
- select
- CASE
- WHEN count(1) > 0 THEN true ELSE false
- END
- from t_flink_app
- where job_name = #{jobName}
- limit 1
- </select>
-
- <select id="existsByUserId" resultType="java.lang.Boolean"
parameterType="java.lang.Long">
- select
- CASE
- WHEN count(1) > 0 THEN true ELSE false
- END
- from t_flink_app
- where user_id = #{userId}
- limit 1
- </select>
-
- <select id="existsJobByClusterId" resultType="java.lang.Boolean"
parameterType="java.lang.Long">
- SELECT
- CASE
- WHEN count(1) > 0 THEN true ELSE false
- END
- FROM t_flink_app
- WHERE flink_cluster_id = #{clusterId}
- limit 1
- </select>
-
<select id="existsRunningJobByClusterId" resultType="java.lang.Boolean"
parameterType="java.lang.Long">
select
CASE
@@ -133,15 +93,7 @@
limit 1
</select>
- <select id="countJobsByClusterId" resultType="java.lang.Integer"
parameterType="java.lang.Long">
- select
- count(1)
- from t_flink_app
- where flink_cluster_id = #{clusterId}
- limit 1
- </select>
-
- <select id="countAffectedJobsByClusterId" resultType="java.lang.Integer"
parameterType="java.lang.Long">
+ <select id="countAffectedByClusterId" resultType="java.lang.Integer"
parameterType="java.lang.Long">
select
count(1)
from t_flink_app