This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch dev-2.1.3 in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 4d7db2c3ab64521a35ae95a46468266a3b425a65 Author: benjobs <[email protected]> AuthorDate: Sat Dec 23 09:20:06 2023 +0800 [Improve] flink job on yarn exists check improvement --- .../core/service/impl/ApplicationServiceImpl.java | 70 +++++++++++----------- .../components/AppView/StartApplicationModal.vue | 18 +++++- .../src/views/flink/app/hooks/useApp.tsx | 36 +---------- 3 files changed, 53 insertions(+), 71 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 162ed4896..9e2c679e7 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -1325,11 +1325,17 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli cancelFuture.whenComplete( (cancelResponse, throwable) -> { cancelFutureMap.remove(application.getId()); + if (throwable != null) { + String exception = Utils.stringifyException(throwable); + applicationLog.setException(exception); + applicationLog.setSuccess(false); + applicationLogService.save(applicationLog); + if (throwable instanceof CancellationException) { doStopped(application); } else { - log.error("stop flink job fail.", throwable); + log.error("stop flink job failed.", throwable); application.setOptionState(OptionState.NONE.getValue()); application.setState(FlinkAppState.FAILED.getValue()); updateById(application); @@ -1345,31 +1351,30 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli } else { FlinkRESTAPIWatcher.unWatching(application.getId()); } - - String exception = Utils.stringifyException(throwable); - applicationLog.setException(exception); - applicationLog.setSuccess(false); - } - } else { - applicationLog.setSuccess(true); - if (cancelResponse != null && cancelResponse.savePointDir() != null) { - String savePointDir = cancelResponse.savePointDir(); - log.info("savePoint path: {}", savePointDir); - SavePoint savePoint = new SavePoint(); - savePoint.setPath(savePointDir); - savePoint.setAppId(application.getId()); - savePoint.setLatest(true); - savePoint.setType(CheckPointType.SAVEPOINT.get()); - savePoint.setCreateTime(new Date()); - savePoint.setTriggerTime(triggerTime); - savePointService.save(savePoint); - } - if (isKubernetesApp(application)) { - k8SFlinkTrackMonitor.unWatching(toTrackId(application)); } + return; } + + applicationLog.setSuccess(true); // save log... applicationLogService.save(applicationLog); + + if (cancelResponse != null && cancelResponse.savePointDir() != null) { + String savePointDir = cancelResponse.savePointDir(); + log.info("savePoint path: {}", savePointDir); + SavePoint savePoint = new SavePoint(); + savePoint.setPath(savePointDir); + savePoint.setAppId(application.getId()); + savePoint.setLatest(true); + savePoint.setType(CheckPointType.SAVEPOINT.get()); + savePoint.setCreateTime(new Date()); + savePoint.setTriggerTime(triggerTime); + savePointService.save(savePoint); + } + + if (isKubernetesApp(application)) { + k8SFlinkTrackMonitor.unWatching(toTrackId(application)); + } }); } @@ -1596,9 +1601,12 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli // 2) exception if (throwable != null) { + String exception = Utils.stringifyException(throwable); + applicationLog.setException(exception); + applicationLog.setSuccess(false); + applicationLogService.save(applicationLog); if (throwable instanceof CancellationException) { doStopped(application); - return; } else { Application app = getById(appParam.getId()); app.setState(FlinkAppState.FAILED.getValue()); @@ -1610,12 +1618,6 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli FlinkRESTAPIWatcher.unWatching(appParam.getId()); } } - String exception = Utils.stringifyException(throwable); - applicationLog.setException(exception); - applicationLog.setSuccess(false); - applicationLogService.save(applicationLog); - // set savepoint to expire - savePointService.expire(application.getId()); return; } @@ -1736,8 +1738,8 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli return properties; } - private void doStopped(Application app) { - Application application = getById(app); + private void doStopped(Application appParam) { + Application application = getById(appParam); application.setOptionState(OptionState.NONE.getValue()); application.setState(FlinkAppState.CANCELED.getValue()); application.setOptionTime(new Date()); @@ -1752,7 +1754,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli FlinkRESTAPIWatcher.unWatching(application.getId()); } // kill application - if (ExecutionMode.isYarnMode(app.getExecutionModeEnum())) { + if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) { try { List<ApplicationReport> applications = getApplicationReports(application.getJobName()); if (!applications.isEmpty()) { @@ -1851,10 +1853,6 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli YarnApplicationState.RUNNING); Set<String> yarnTag = Sets.newHashSet("streampark"); List<ApplicationReport> applications = yarnClient.getApplications(types, states, yarnTag); - // Compatible with historical versions. - if (applications.isEmpty()) { - applications = yarnClient.getApplications(types, states); - } return applications.stream() .filter(report -> report.getName().equals(jobName)) .collect(Collectors.toList()); diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue index ad2db10d4..205b6173f 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue @@ -31,7 +31,8 @@ import { BasicModal, useModalInner } from '/@/components/Modal'; import { useMessage } from '/@/hooks/web/useMessage'; import { useRouter } from 'vue-router'; - import { fetchStart } from '/@/api/flink/app/app'; + import { fetchCheckStart, fetchForcedStop, fetchStart } from '/@/api/flink/app/app'; + import { AppExistsEnum } from '/@/enums/flinkEnum'; const SelectOption = Select.Option; @@ -110,8 +111,21 @@ baseColProps: { span: 24 }, }); - /* submit */ async function handleSubmit() { + // when then app is building, show forced starting modal + const resp = await fetchCheckStart({ + id: receiveData.application.id, + }); + if (resp.data.data === AppExistsEnum.IN_YARN) { + await fetchForcedStop({ + id: receiveData.application.id, + }); + } + await handleDoSubmit(); + } + + /* submit */ + async function handleDoSubmit() { try { const formValue = (await validate()) as Recordable; const savePointed = formValue.startSavePointed; diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx index 9a0247897..00619db53 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx @@ -17,18 +17,12 @@ import { Alert, Form, Input, Tag } from 'ant-design-vue'; import { h, onMounted, reactive, ref, unref, VNode } from 'vue'; import { handleAppBuildStatueText } from '../utils'; -import { - fetchCheckName, - fetchCopy, - fetchCheckStart, - fetchForcedStop, - fetchMapping, -} from '/@/api/flink/app/app'; +import { fetchCheckName, fetchCopy, fetchForcedStop, fetchMapping } from '/@/api/flink/app/app'; import { fetchBuild, fetchBuildDetail } from '/@/api/flink/app/flinkBuild'; import { fetchSavePonitHistory } from '/@/api/flink/app/savepoint'; import { fetchAppOwners } from '/@/api/system/user'; import { SvgIcon } from '/@/components/Icon'; -import { AppExistsEnum, AppStateEnum, ExecModeEnum, OptionStateEnum } from '/@/enums/flinkEnum'; +import { AppStateEnum, ExecModeEnum, OptionStateEnum } from '/@/enums/flinkEnum'; import { useI18n } from '/@/hooks/web/useI18n'; import { useMessage } from '/@/hooks/web/useMessage'; @@ -85,31 +79,7 @@ export const useFlinkApplication = (openStartModal: Fn) => { } /* start application */ - async function handleAppCheckStart(app: Recordable) { - // when then app is building, show forced starting modal - const resp = await fetchCheckStart({ - id: app.id, - }); - if (resp.data === AppExistsEnum.IN_YARN) { - Swal.fire({ - title: 'Are you sure?', - text: `current job already exists on yarn, are you sure resubmit the job and managed by StreamPark?`, - icon: 'warning', - showCancelButton: true, - confirmButtonText: 'Yes, resubmit', - denyButtonText: `No, close`, - confirmButtonColor: '#d33', - cancelButtonColor: '#3085d6', - }).then(async (result) => { - if (result.isConfirmed) { - await fetchForcedStop({ - id: app.id, - }); - await handleStart(app); - return Promise.resolve(true); - } - }); - } + function handleAppCheckStart(app: Recordable) { // when then app is building, show forced starting modal if (app['appControl']['allowStart'] === false) { handleFetchBuildDetail(app);
