This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch issue-3749 in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit d16c9fe7aa15fe5bdd59fde9e67112778b465f87 Author: benjobs <[email protected]> AuthorDate: Sat Jun 15 16:03:16 2024 +0800 [Improve] flink job restore state from checkpoint bug fixed. #3749 --- .../core/service/impl/ApplicationServiceImpl.java | 9 +++++++++ .../console/core/task/CheckpointProcessor.java | 16 ++++++++++++++-- .../streampark-console-webapp/src/api/base/system.ts | 2 -- .../src/views/flink/app/Detail.vue | 4 +--- .../src/views/setting/FlinkHome/components/Modal.vue | 11 +++++++++-- 5 files changed, 33 insertions(+), 9 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index dcdae2437..1028b37be 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -79,6 +79,7 @@ import org.apache.streampark.console.core.service.ServiceHelper; import org.apache.streampark.console.core.service.SettingService; import org.apache.streampark.console.core.service.VariableService; import org.apache.streampark.console.core.service.YarnQueueService; +import org.apache.streampark.console.core.task.CheckpointProcessor; import org.apache.streampark.console.core.task.FlinkAppHttpWatcher; import org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper; import org.apache.streampark.flink.client.FlinkClient; @@ -214,6 +215,8 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli @Autowired private FlinkK8sWatcherWrapper k8sWatcherWrapper; + @Autowired private CheckpointProcessor checkpointProcessor; + private static final int CPU_NUM = Math.max(2, Runtime.getRuntime().availableProcessors() * 4); private final ExecutorService bootstrapExecutor = @@ -1690,6 +1693,12 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli // 3) success applicationLog.setSuccess(true); + + // issue: https://github.com/apache/incubator-streampark/issues/3749 + if (appParam.getSavePointed() == null || !appParam.getSavePointed()) { + checkpointProcessor.resetCheckpointNum(appParam.getId()); + } + if (response.flinkConfig() != null) { String jmMemory = response.flinkConfig().get(ConfigConst.KEY_FLINK_JM_PROCESS_MEMORY()); if (jmMemory != null) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java index 8fc19a78b..e08d37102 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java @@ -90,7 +90,7 @@ public class CheckpointProcessor { return; } - Long latestChkId = getLatestCheckpointedId(appId, checkPointKey.getCheckPointId()); + Long latestChkId = getLatestCheckpointId(appId, checkPointKey.getCheckPointId()); if (checkSaveAsCheckpoint(checkPoint, latestChkId)) { checkPointCache.put(checkPointKey.getCheckPointId(), checkPoint.getId()); saveSavepoint(checkPoint, application.getId()); @@ -131,6 +131,18 @@ public class CheckpointProcessor { } } + // issue: https://github.com/apache/incubator-streampark/issues/3749 + public void resetCheckpointNum(Long appId) { + checkPointCache + .asMap() + .forEach( + (k, v) -> { + if (k.startsWith(appId.toString())) { + checkPointCache.invalidate(k); + } + }); + } + private boolean checkSaveAsCheckpoint(@Nonnull CheckPoints.CheckPoint checkPoint, Long latestId) { return !checkPoint.getIsSavepoint() && (latestId == null || latestId < checkPoint.getId()); } @@ -148,7 +160,7 @@ public class CheckpointProcessor { } @Nullable - private Long getLatestCheckpointedId(Long appId, String cacheId) { + private Long getLatestCheckpointId(Long appId, String cacheId) { return checkPointCache.get( cacheId, key -> { diff --git a/streampark-console/streampark-console-webapp/src/api/base/system.ts b/streampark-console/streampark-console-webapp/src/api/base/system.ts index d117f7b57..9a517fddd 100644 --- a/streampark-console/streampark-console-webapp/src/api/base/system.ts +++ b/streampark-console/streampark-console-webapp/src/api/base/system.ts @@ -19,10 +19,8 @@ import { RolePageParams, RolePageListGetResultModel, MenuListModel, - UserListGetResultModel, } from './model/systemModel'; import { defHttp } from '/@/utils/http/axios'; -import { BasicPageParams } from '/@/api/model/baseModel'; enum Api { MenuList = '/menu/list', diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue index bdf731c80..b949cc1b7 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue @@ -29,7 +29,7 @@ import { Description, useDescription } from '/@/components/Description'; import { Icon } from '/@/components/Icon'; import { useRoute, useRouter } from 'vue-router'; - import { fetchBackUps, fetchGet, fetchOptionLog, fetchYarn } from '/@/api/flink/app/app'; + import { fetchGet, fetchOptionLog, fetchYarn } from '/@/api/flink/app/app'; import { onUnmounted, reactive, h, unref, ref, onMounted, computed } from 'vue'; import { useIntervalFn, useClipboard } from '@vueuse/core'; import { AppListRecord } from '/@/api/flink/app/app.type'; @@ -178,8 +178,6 @@ if (confList.records.length > 0) detailTabs.showConf = true; if (pointHistory.records.length > 0) detailTabs.showSaveOption = true; if (optionList.records.length > 0) detailTabs.showOptionLog = true; - //const backupList = await fetchBackUps(commonParams); - //if (backupList.records.length > 0) detailTabs.showBackup = true; } /* Get yarn data */ diff --git a/streampark-console/streampark-console-webapp/src/views/setting/FlinkHome/components/Modal.vue b/streampark-console/streampark-console-webapp/src/views/setting/FlinkHome/components/Modal.vue index de0ee072d..7e3c501c7 100644 --- a/streampark-console/streampark-console-webapp/src/views/setting/FlinkHome/components/Modal.vue +++ b/streampark-console/streampark-console-webapp/src/views/setting/FlinkHome/components/Modal.vue @@ -91,8 +91,15 @@ /* form submit */ async function handleSubmit() { - changeOkLoading(true); - const formValue = await validate(); + let formValue; + try { + formValue = await validate(); + } catch (error) { + console.warn('validate error:', error); + return; + } finally { + changeOkLoading(false); + } // Detection environment const { data: resp } = await fetchCheckEnv({ id: versionId.value,
