This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new 7ea275f78 [Improve] flink job on yarn check exists improvement
7ea275f78 is described below
commit 7ea275f7880295e1dcc37ac1f875ff80422f1c05
Author: benjobs <[email protected]>
AuthorDate: Fri Dec 22 15:19:36 2023 +0800
[Improve] flink job on yarn check exists improvement
---
.../core/controller/ApplicationController.java | 8 +++++
.../console/core/service/ApplicationService.java | 2 ++
.../core/service/impl/ApplicationServiceImpl.java | 14 +++++++++
.../src/api/flink/app/app.ts | 5 +++
.../src/enums/flinkEnum.ts | 8 +++++
.../src/views/flink/app/hooks/useApp.tsx | 36 ++++++++++++++++++++--
6 files changed, 70 insertions(+), 3 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
index 12486ef8a..61489b771 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
@@ -246,6 +246,14 @@ public class ApplicationController {
}
}
+ @PermissionAction(id = "#app.id", type = PermissionType.APP)
+ @PostMapping(value = "check_start")
+ @RequiresPermissions("app:start")
+ public RestResponse checkStart(Long id) {
+ AppExistsState stateEnum = applicationService.checkStart(id);
+ return RestResponse.success(stateEnum.get());
+ }
+
@Operation(
summary = "Cancel application",
tags = {ApiDocConstant.FLINK_APP_OP_TAG})
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index 1960540a1..8fb516b79 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -120,4 +120,6 @@ public interface ApplicationService extends
IService<Application> {
List<String> historyUploadJars();
String k8sStartLog(Long id, Integer offset, Integer limit) throws Exception;
+
+ AppExistsState checkStart(Long id);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 2ed6346dc..162ed4896 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -638,6 +638,20 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
}
+ @Override
+ public AppExistsState checkStart(Long id) {
+ Application application = getById(id);
+ if (application == null) {
+ return AppExistsState.INVALID;
+ }
+ if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
+ boolean exists =
!getApplicationReports(application.getJobName()).isEmpty();
+ return exists ? AppExistsState.IN_YARN : AppExistsState.NO;
+ }
+ // todo on k8s check...
+ return AppExistsState.NO;
+ }
+
@Override
public String getYarnName(Application appParam) {
String[] args = new String[2];
diff --git
a/streampark-console/streampark-console-webapp/src/api/flink/app/app.ts
b/streampark-console/streampark-console-webapp/src/api/flink/app/app.ts
index 93f49226e..d094fdd23 100644
--- a/streampark-console/streampark-console-webapp/src/api/flink/app/app.ts
+++ b/streampark-console/streampark-console-webapp/src/api/flink/app/app.ts
@@ -40,6 +40,7 @@ enum APP_API {
DELETE = '/flink/app/delete',
DELETE_BAK = '/flink/app/deletebak',
CREATE = '/flink/app/create',
+ CHECK_START = '/flink/app/check_start',
START = '/flink/app/start',
CLEAN = '/flink/app/clean',
BACKUPS = '/flink/app/backups',
@@ -174,6 +175,10 @@ export function fetchForcedStop(data: { id: string }):
Promise<boolean> {
return defHttp.post({ url: APP_API.FORCED_STOP, data });
}
+export function fetchCheckStart(data): Promise<AxiosResponse<number>> {
+ return defHttp.post({ url: APP_API.CHECK_START, data }, {
isReturnNativeResponse: true });
+}
+
export function fetchStart(data): Promise<AxiosResponse<Result>> {
return defHttp.post({ url: APP_API.START, data }, { isReturnNativeResponse:
true });
}
diff --git
a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
index c2e8a2bee..e1194e2b4 100644
--- a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
+++ b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
@@ -211,3 +211,11 @@ export enum FailoverStrategyEnum {
ALERT = 1,
RESTART = 2,
}
+
+export enum AppExistsEnum {
+ NO = 0,
+ IN_DB = 1,
+ IN_YARN = 2,
+ IN_KUBERNETES = 3,
+ INVALID = 4,
+}
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
index 00619db53..9a0247897 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
@@ -17,12 +17,18 @@
import { Alert, Form, Input, Tag } from 'ant-design-vue';
import { h, onMounted, reactive, ref, unref, VNode } from 'vue';
import { handleAppBuildStatueText } from '../utils';
-import { fetchCheckName, fetchCopy, fetchForcedStop, fetchMapping } from
'/@/api/flink/app/app';
+import {
+ fetchCheckName,
+ fetchCopy,
+ fetchCheckStart,
+ fetchForcedStop,
+ fetchMapping,
+} from '/@/api/flink/app/app';
import { fetchBuild, fetchBuildDetail } from '/@/api/flink/app/flinkBuild';
import { fetchSavePonitHistory } from '/@/api/flink/app/savepoint';
import { fetchAppOwners } from '/@/api/system/user';
import { SvgIcon } from '/@/components/Icon';
-import { AppStateEnum, ExecModeEnum, OptionStateEnum } from
'/@/enums/flinkEnum';
+import { AppExistsEnum, AppStateEnum, ExecModeEnum, OptionStateEnum } from
'/@/enums/flinkEnum';
import { useI18n } from '/@/hooks/web/useI18n';
import { useMessage } from '/@/hooks/web/useMessage';
@@ -79,7 +85,31 @@ export const useFlinkApplication = (openStartModal: Fn) => {
}
/* start application */
- function handleAppCheckStart(app: Recordable) {
+ async function handleAppCheckStart(app: Recordable) {
+ // when then app is building, show forced starting modal
+ const resp = await fetchCheckStart({
+ id: app.id,
+ });
+ if (resp.data === AppExistsEnum.IN_YARN) {
+ Swal.fire({
+ title: 'Are you sure?',
+ text: `current job already exists on yarn, are you sure resubmit the
job and managed by StreamPark?`,
+ icon: 'warning',
+ showCancelButton: true,
+ confirmButtonText: 'Yes, resubmit',
+ denyButtonText: `No, close`,
+ confirmButtonColor: '#d33',
+ cancelButtonColor: '#3085d6',
+ }).then(async (result) => {
+ if (result.isConfirmed) {
+ await fetchForcedStop({
+ id: app.id,
+ });
+ await handleStart(app);
+ return Promise.resolve(true);
+ }
+ });
+ }
// when then app is building, show forced starting modal
if (app['appControl']['allowStart'] === false) {
handleFetchBuildDetail(app);