This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 4d7db2c3ab64521a35ae95a46468266a3b425a65
Author: benjobs <[email protected]>
AuthorDate: Sat Dec 23 09:20:06 2023 +0800

    [Improve] flink job on yarn exists check improvement
---
 .../core/service/impl/ApplicationServiceImpl.java  | 70 +++++++++++-----------
 .../components/AppView/StartApplicationModal.vue   | 18 +++++-
 .../src/views/flink/app/hooks/useApp.tsx           | 36 +----------
 3 files changed, 53 insertions(+), 71 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 162ed4896..9e2c679e7 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1325,11 +1325,17 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     cancelFuture.whenComplete(
         (cancelResponse, throwable) -> {
           cancelFutureMap.remove(application.getId());
+
           if (throwable != null) {
+            String exception = Utils.stringifyException(throwable);
+            applicationLog.setException(exception);
+            applicationLog.setSuccess(false);
+            applicationLogService.save(applicationLog);
+
             if (throwable instanceof CancellationException) {
               doStopped(application);
             } else {
-              log.error("stop flink job fail.", throwable);
+              log.error("stop flink job failed.", throwable);
               application.setOptionState(OptionState.NONE.getValue());
               application.setState(FlinkAppState.FAILED.getValue());
               updateById(application);
@@ -1345,31 +1351,30 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
               } else {
                 FlinkRESTAPIWatcher.unWatching(application.getId());
               }
-
-              String exception = Utils.stringifyException(throwable);
-              applicationLog.setException(exception);
-              applicationLog.setSuccess(false);
-            }
-          } else {
-            applicationLog.setSuccess(true);
-            if (cancelResponse != null && cancelResponse.savePointDir() != 
null) {
-              String savePointDir = cancelResponse.savePointDir();
-              log.info("savePoint path: {}", savePointDir);
-              SavePoint savePoint = new SavePoint();
-              savePoint.setPath(savePointDir);
-              savePoint.setAppId(application.getId());
-              savePoint.setLatest(true);
-              savePoint.setType(CheckPointType.SAVEPOINT.get());
-              savePoint.setCreateTime(new Date());
-              savePoint.setTriggerTime(triggerTime);
-              savePointService.save(savePoint);
-            }
-            if (isKubernetesApp(application)) {
-              k8SFlinkTrackMonitor.unWatching(toTrackId(application));
             }
+            return;
           }
+
+          applicationLog.setSuccess(true);
           // save log...
           applicationLogService.save(applicationLog);
+
+          if (cancelResponse != null && cancelResponse.savePointDir() != null) 
{
+            String savePointDir = cancelResponse.savePointDir();
+            log.info("savePoint path: {}", savePointDir);
+            SavePoint savePoint = new SavePoint();
+            savePoint.setPath(savePointDir);
+            savePoint.setAppId(application.getId());
+            savePoint.setLatest(true);
+            savePoint.setType(CheckPointType.SAVEPOINT.get());
+            savePoint.setCreateTime(new Date());
+            savePoint.setTriggerTime(triggerTime);
+            savePointService.save(savePoint);
+          }
+
+          if (isKubernetesApp(application)) {
+            k8SFlinkTrackMonitor.unWatching(toTrackId(application));
+          }
         });
   }
 
@@ -1596,9 +1601,12 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
           // 2) exception
           if (throwable != null) {
+            String exception = Utils.stringifyException(throwable);
+            applicationLog.setException(exception);
+            applicationLog.setSuccess(false);
+            applicationLogService.save(applicationLog);
             if (throwable instanceof CancellationException) {
               doStopped(application);
-              return;
             } else {
               Application app = getById(appParam.getId());
               app.setState(FlinkAppState.FAILED.getValue());
@@ -1610,12 +1618,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
                 FlinkRESTAPIWatcher.unWatching(appParam.getId());
               }
             }
-            String exception = Utils.stringifyException(throwable);
-            applicationLog.setException(exception);
-            applicationLog.setSuccess(false);
-            applicationLogService.save(applicationLog);
-            // set savepoint to expire
-            savePointService.expire(application.getId());
             return;
           }
 
@@ -1736,8 +1738,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     return properties;
   }
 
-  private void doStopped(Application app) {
-    Application application = getById(app);
+  private void doStopped(Application appParam) {
+    Application application = getById(appParam);
     application.setOptionState(OptionState.NONE.getValue());
     application.setState(FlinkAppState.CANCELED.getValue());
     application.setOptionTime(new Date());
@@ -1752,7 +1754,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       FlinkRESTAPIWatcher.unWatching(application.getId());
     }
     // kill application
-    if (ExecutionMode.isYarnMode(app.getExecutionModeEnum())) {
+    if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
       try {
         List<ApplicationReport> applications = 
getApplicationReports(application.getJobName());
         if (!applications.isEmpty()) {
@@ -1851,10 +1853,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
               YarnApplicationState.RUNNING);
       Set<String> yarnTag = Sets.newHashSet("streampark");
       List<ApplicationReport> applications = yarnClient.getApplications(types, 
states, yarnTag);
-      // Compatible with historical versions.
-      if (applications.isEmpty()) {
-        applications = yarnClient.getApplications(types, states);
-      }
       return applications.stream()
           .filter(report -> report.getName().equals(jobName))
           .collect(Collectors.toList());
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
index ad2db10d4..205b6173f 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
@@ -31,7 +31,8 @@
   import { BasicModal, useModalInner } from '/@/components/Modal';
   import { useMessage } from '/@/hooks/web/useMessage';
   import { useRouter } from 'vue-router';
-  import { fetchStart } from '/@/api/flink/app/app';
+  import { fetchCheckStart, fetchForcedStop, fetchStart } from 
'/@/api/flink/app/app';
+  import { AppExistsEnum } from '/@/enums/flinkEnum';
 
   const SelectOption = Select.Option;
 
@@ -110,8 +111,21 @@
     baseColProps: { span: 24 },
   });
 
-  /* submit */
   async function handleSubmit() {
+    // when then app is building, show forced starting modal
+    const resp = await fetchCheckStart({
+      id: receiveData.application.id,
+    });
+    if (resp.data.data === AppExistsEnum.IN_YARN) {
+      await fetchForcedStop({
+        id: receiveData.application.id,
+      });
+    }
+    await handleDoSubmit();
+  }
+
+  /* submit */
+  async function handleDoSubmit() {
     try {
       const formValue = (await validate()) as Recordable;
       const savePointed = formValue.startSavePointed;
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
index 9a0247897..00619db53 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
@@ -17,18 +17,12 @@
 import { Alert, Form, Input, Tag } from 'ant-design-vue';
 import { h, onMounted, reactive, ref, unref, VNode } from 'vue';
 import { handleAppBuildStatueText } from '../utils';
-import {
-  fetchCheckName,
-  fetchCopy,
-  fetchCheckStart,
-  fetchForcedStop,
-  fetchMapping,
-} from '/@/api/flink/app/app';
+import { fetchCheckName, fetchCopy, fetchForcedStop, fetchMapping } from 
'/@/api/flink/app/app';
 import { fetchBuild, fetchBuildDetail } from '/@/api/flink/app/flinkBuild';
 import { fetchSavePonitHistory } from '/@/api/flink/app/savepoint';
 import { fetchAppOwners } from '/@/api/system/user';
 import { SvgIcon } from '/@/components/Icon';
-import { AppExistsEnum, AppStateEnum, ExecModeEnum, OptionStateEnum } from 
'/@/enums/flinkEnum';
+import { AppStateEnum, ExecModeEnum, OptionStateEnum } from 
'/@/enums/flinkEnum';
 import { useI18n } from '/@/hooks/web/useI18n';
 import { useMessage } from '/@/hooks/web/useMessage';
 
@@ -85,31 +79,7 @@ export const useFlinkApplication = (openStartModal: Fn) => {
   }
 
   /* start application */
-  async function handleAppCheckStart(app: Recordable) {
-    // when then app is building, show forced starting modal
-    const resp = await fetchCheckStart({
-      id: app.id,
-    });
-    if (resp.data === AppExistsEnum.IN_YARN) {
-      Swal.fire({
-        title: 'Are you sure?',
-        text: `current job already exists on yarn, are you sure resubmit the 
job and managed by StreamPark?`,
-        icon: 'warning',
-        showCancelButton: true,
-        confirmButtonText: 'Yes, resubmit',
-        denyButtonText: `No, close`,
-        confirmButtonColor: '#d33',
-        cancelButtonColor: '#3085d6',
-      }).then(async (result) => {
-        if (result.isConfirmed) {
-          await fetchForcedStop({
-            id: app.id,
-          });
-          await handleStart(app);
-          return Promise.resolve(true);
-        }
-      });
-    }
+  function handleAppCheckStart(app: Recordable) {
     // when then app is building, show forced starting modal
     if (app['appControl']['allowStart'] === false) {
       handleFetchBuildDetail(app);

Reply via email to