This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 89184ff18 [Improve] Flink Home check improvement (#2641)
89184ff18 is described below
commit 89184ff18a137e9886b1854eb6dd139eddb760fc
Author: benjobs <[email protected]>
AuthorDate: Sun Apr 16 22:18:34 2023 +0800
[Improve] Flink Home check improvement (#2641)
---
.../core/controller/FlinkEnvController.java | 7 ++++---
.../console/core/service/FlinkEnvService.java | 7 +------
.../core/service/impl/FlinkEnvServiceImpl.java | 23 ++++++++++++----------
.../src/api/flink/setting/flinkEnv.ts | 6 +++---
.../src/views/setting/FlinkHome/index.vue | 4 ++--
5 files changed, 23 insertions(+), 24 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkEnvController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkEnvController.java
index b5a1e8fd2..0cde10904 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkEnvController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkEnvController.java
@@ -100,9 +100,10 @@ public class FlinkEnvController {
return RestResponse.success();
}
- @PostMapping("checkForUpdateOrDelete")
- public RestResponse checkForUpdateOrDelete(FlinkEnv version) {
- flinkEnvService.checkForUpdateOrDelete(version.getId());
+ @Operation(summary = "Check flink environment is valid, else throw
exception")
+ @PostMapping("validity")
+ public RestResponse validity(FlinkEnv version) {
+ flinkEnvService.validity(version.getId());
return RestResponse.success(true);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
index 1c480c0b9..c00f934b9 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java
@@ -92,10 +92,5 @@ public interface FlinkEnvService extends IService<FlinkEnv> {
*/
void syncConf(Long id) throws IOException;
- /**
- * check for update or delete operation
- *
- * @param id
- */
- FlinkEnv checkForUpdateOrDelete(Long id);
+ void validity(Long id);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
index 6e8402939..0c44fe776 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
@@ -90,8 +90,8 @@ public class FlinkEnvServiceImpl extends
ServiceImpl<FlinkEnvMapper, FlinkEnv>
@Override
public void delete(Long id) {
- FlinkEnv flinkEnv = checkForUpdateOrDelete(id);
-
+ FlinkEnv flinkEnv = getById(id);
+ checkOrElseAlert(flinkEnv);
Long count = this.baseMapper.selectCount(null);
ApiAlertException.throwIfFalse(
!(count > 1 && flinkEnv.getIsDefault()),
@@ -102,8 +102,8 @@ public class FlinkEnvServiceImpl extends
ServiceImpl<FlinkEnvMapper, FlinkEnv>
@Override
public void update(FlinkEnv version) throws IOException {
- FlinkEnv flinkEnv = checkForUpdateOrDelete(version.getId());
-
+ FlinkEnv flinkEnv = getById(version.getId());
+ checkOrElseAlert(flinkEnv);
flinkEnv.setDescription(version.getDescription());
flinkEnv.setFlinkName(version.getFlinkName());
if (!version.getFlinkHome().equals(flinkEnv.getFlinkHome())) {
@@ -140,28 +140,31 @@ public class FlinkEnvServiceImpl extends
ServiceImpl<FlinkEnvMapper, FlinkEnv>
}
@Override
- public void syncConf(Long id) throws IOException {
+ public void syncConf(Long id) {
FlinkEnv flinkEnv = getById(id);
flinkEnv.doSetFlinkConf();
updateById(flinkEnv);
}
- public FlinkEnv checkForUpdateOrDelete(Long id) {
+ @Override
+ public void validity(Long id) {
FlinkEnv flinkEnv = getById(id);
+ checkOrElseAlert(flinkEnv);
+ }
+
+ private void checkOrElseAlert(FlinkEnv flinkEnv) {
// 1.check exists
ApiAlertException.throwIfNull(flinkEnv, "The flink home does not exist,
please check.");
// 2.check if it is being used by any flink cluster
ApiAlertException.throwIfFalse(
- !flinkClusterService.existsByFlinkEnvId(id),
+ !flinkClusterService.existsByFlinkEnvId(flinkEnv.getId()),
"The flink home is still in use by some flink cluster, please check.");
// 3.check if it is being used by any application
ApiAlertException.throwIfFalse(
- !applicationService.existsJobByFlinkEnvId(id),
+ !applicationService.existsJobByFlinkEnvId(flinkEnv.getId()),
"The flink home is still in use by some application, please check.");
-
- return flinkEnv;
}
}
diff --git
a/streampark-console/streampark-console-webapp/src/api/flink/setting/flinkEnv.ts
b/streampark-console/streampark-console-webapp/src/api/flink/setting/flinkEnv.ts
index 7d051317c..44c56cfe4 100644
---
a/streampark-console/streampark-console-webapp/src/api/flink/setting/flinkEnv.ts
+++
b/streampark-console/streampark-console-webapp/src/api/flink/setting/flinkEnv.ts
@@ -28,7 +28,7 @@ enum FLINK_API {
SYNC = '/flink/env/sync',
UPDATE = '/flink/env/update',
DEFAULT = '/flink/env/default',
- CHECK_FOR_UPDATE_OR_DELETE = '/flink/env/checkForUpdateOrDelete',
+ VALIDITY = '/flink/env/validity',
}
/**
* flink environment data
@@ -93,9 +93,9 @@ export function fetchCheckEnv(data: {
* @param {String} id
* @returns {Promise<Boolean>}
*/
-export function fetchCheckForUpdateOrDelete(id: string):
Promise<AxiosResponse<Result<boolean>>> {
+export function fetchValidity(id: string):
Promise<AxiosResponse<Result<boolean>>> {
return defHttp.post({
- url: FLINK_API.CHECK_FOR_UPDATE_OR_DELETE,
+ url: FLINK_API.VALIDITY,
data: { id },
}, { isReturnNativeResponse: true });
}
diff --git
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkHome/index.vue
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkHome/index.vue
index 7cbf02a94..04f33fe34 100644
---
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkHome/index.vue
+++
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkHome/index.vue
@@ -42,7 +42,7 @@
} from '@ant-design/icons-vue';
import { FlinkEnvModal, FlinkEnvDrawer } from './components';
import {
- fetchCheckForUpdateOrDelete,
+ fetchValidity,
fetchDefaultSet,
fetchFlinkEnv,
fetchFlinkEnvRemove,
@@ -65,7 +65,7 @@
const [registerFlinkDraw, { openDrawer: openEnvDrawer }] = useDrawer();
/* Edit button */
async function handleEditFlink(item: FlinkEnv) {
- const resp = await fetchCheckForUpdateOrDelete(item.id);
+ const resp = await fetchValidity(item.id);
if (resp.data.code == 200) {
versionId.value = item.id;
openFlinkModal(true, {