RocMarshal commented on code in PR #2268: URL: https://github.com/apache/incubator-streampark/pull/2268#discussion_r1094617097
########## streampark-console/streampark-console-service/src/main/assembly/script/upgrade/ISSUE-2192/schema/2192-mysql.sql: ########## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +alter table `t_flink_savepoint` add column `job_id` VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL after `app_id`; Review Comment: But now, the developers couldn't know the next release version in time. There're two optional idea: - The community supply the release versions arrange ahead of schedule, developers will follow the rule to coding. - The delta ddl & ddl could be coded with the issue-id. The PIC of the next release sort it based on the version before the release. Any suggestion is appreciated. ########## streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql: ########## @@ -210,12 +210,19 @@ drop table if exists `t_flink_savepoint`; create table `t_flink_savepoint` ( `id` bigint not null auto_increment, `app_id` bigint not null, + `job_id` VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, + `status` VARCHAR(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT 'COMPLETED', Review Comment: It isn't appropriate to introduce the field in the Issue. The platform doesn't collect the failed savepoint/chk into the db. Maybe it's a good feature to record the exceptionally savepoint/chk for job monitor. ########## streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java: ########## @@ -114,6 +127,42 @@ public SavePoint getLatest(Long id) { return this.getOne(queryWrapper); } + @Override + public Boolean triggerSavepoint(Long appId, @Nullable String savepointPath) { + log.info("Start to trigger savepoint for app {}", appId); + Application app = appService.getById(appId); + AssertUtils.notNull(app, String.format("The application %s doesn't exist.", appId)); + String trackingUrl = app.getJobManagerUrl(); + AssertUtils.state( + StringUtils.isNotEmpty(trackingUrl), + String.format("The flink trackingUrl for app[%s] isn't available.", appId)); + String jobId = app.getJobId(); + AssertUtils.state( + StringUtils.isNotEmpty(jobId), + String.format("The jobId of application[%s] is absent.", appId)); + String triggerId = triggerSavepoint(trackingUrl, jobId, savepointPath); + log.info("Request savepoint successful in triggerId {}", triggerId); + return true; Review Comment: nice ~ ########## streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql: ########## @@ -210,12 +210,19 @@ drop table if exists `t_flink_savepoint`; create table `t_flink_savepoint` ( `id` bigint not null auto_increment, `app_id` bigint not null, + `job_id` VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, + `status` VARCHAR(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT 'COMPLETED', `chk_id` bigint default null, `type` tinyint default null, - `path` varchar(255) collate utf8mb4_general_ci default null, + `path` varchar(2048) collate utf8mb4_general_ci default null, `latest` tinyint not null default 1, `trigger_time` datetime default null, + `execution_mode` VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, Review Comment: It's bunded with `status`, which could help user to trace the exceptional savepoint/chk `execution` environment. ########## streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql: ########## @@ -210,12 +210,19 @@ drop table if exists `t_flink_savepoint`; create table `t_flink_savepoint` ( `id` bigint not null auto_increment, `app_id` bigint not null, + `job_id` VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, + `status` VARCHAR(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT 'COMPLETED', `chk_id` bigint default null, `type` tinyint default null, - `path` varchar(255) collate utf8mb4_general_ci default null, + `path` varchar(2048) collate utf8mb4_general_ci default null, `latest` tinyint not null default 1, `trigger_time` datetime default null, + `execution_mode` VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, + `cluster_id` bigint DEFAULT NULL, Review Comment: be same as above. ########## streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql: ########## @@ -210,12 +210,19 @@ drop table if exists `t_flink_savepoint`; create table `t_flink_savepoint` ( `id` bigint not null auto_increment, `app_id` bigint not null, + `job_id` VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, + `status` VARCHAR(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT 'COMPLETED', `chk_id` bigint default null, `type` tinyint default null, - `path` varchar(255) collate utf8mb4_general_ci default null, + `path` varchar(2048) collate utf8mb4_general_ci default null, `latest` tinyint not null default 1, `trigger_time` datetime default null, + `execution_mode` VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, + `cluster_id` bigint DEFAULT NULL, + `end_time` datetime DEFAULT NULL, Review Comment: It could be used to calculate the chk/savepoint time cost. ########## streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java: ########## @@ -108,6 +112,23 @@ public void process(Long appId, CheckPoints checkPoints) { } } + private boolean shouldProcessFailedTrigger( + CheckPoints.CheckPoint checkPoint, Application application, CheckPointStatus status) { + return CheckPointStatus.FAILED.equals(status) + && !checkPoint.getIsSavepoint() + && application.cpFailedTrigger(); + } + + private boolean checkpointNeedStore( + Application application, @Nonnull CheckPoints.CheckPoint checkPoint) { + LambdaQueryWrapper<SavePoint> queryWrapper = Review Comment: A minor confusion for me form the comments. Does the minor refactor cause the logic loss or the original implementation? ########## streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java: ########## @@ -108,6 +112,23 @@ public void process(Long appId, CheckPoints checkPoints) { } } + private boolean shouldProcessFailedTrigger( + CheckPoints.CheckPoint checkPoint, Application application, CheckPointStatus status) { + return CheckPointStatus.FAILED.equals(status) + && !checkPoint.getIsSavepoint() + && application.cpFailedTrigger(); + } + + private boolean checkpointNeedStore( + Application application, @Nonnull CheckPoints.CheckPoint checkPoint) { + LambdaQueryWrapper<SavePoint> queryWrapper = + new LambdaQueryWrapper<SavePoint>() + .eq(SavePoint::getAppId, application.getAppId()) + .eq(SavePoint::getJobId, application.getJobId()) Review Comment: yes, I get it. ########## streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java: ########## @@ -357,7 +357,9 @@ private void handleCheckPoints(Application application) throws Exception { FlinkCluster flinkCluster = getFlinkCluster(application); CheckPoints checkPoints = httpCheckpoints(application, flinkCluster); if (checkPoints != null) { - checkpointProcessor.process(application.getId(), checkPoints); + checkPoints + .getLatestChkAndSavepointWithTimeAscOrder() Review Comment: The original intention is that get chk & savepoint from`latest` with order, then records the info into db. The latest savepoint or chk will be tagged with 'latest'. ########## streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java: ########## @@ -1267,8 +1268,14 @@ public void cancel(Application appParam) throws Exception { savePoint.setAppId(application.getId()); savePoint.setLatest(true); savePoint.setType(CheckPointType.SAVEPOINT.get()); - savePoint.setTriggerTime(now); + savePoint.setTriggerTime(triggerTime); savePoint.setCreateTime(now); + savePoint.setJobId(cancelRequest.jobId()); + savePoint.setEndTime(now); + savePoint.setClusterId(application.getFlinkClusterId()); + if (appParam.getExecutionMode() != null) { + savePoint.setExecutionMode(ExecutionMode.of(appParam.getExecutionMode()).name()); Review Comment: good catch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
