RocMarshal commented on code in PR #2268:
URL:
https://github.com/apache/incubator-streampark/pull/2268#discussion_r1096483433
##########
streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql:
##########
@@ -117,6 +117,7 @@ insert into `t_menu` values (100066, 100015, 'view', null,
null, 'app:view', nul
insert into `t_menu` values (100067, 100053, 'view', NULL, NULL,
'variable:view', NULL, '1', 1, null, now(), now());
insert into `t_menu` values (100068, 100033, 'view', null, null,
'setting:view', null, '1', 1, null, now(), now());
insert into `t_menu` values (100069, 100053, 'depend view', null, null,
'variable:depend_apps', null, '1', 1, NULL, now(), now());
+insert into `t_menu` values (100070, 100015, 'savepoint trigger', null, null,
'savepoint:triggerSavepoint', null, '1', 1, null, now(), now());
Review Comment:
updated.
##########
streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql:
##########
@@ -210,12 +210,19 @@ drop table if exists `t_flink_savepoint`;
create table `t_flink_savepoint` (
`id` bigint not null auto_increment,
`app_id` bigint not null,
+ `job_id` VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
Review Comment:
updated.
##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java:
##########
@@ -36,19 +38,30 @@
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
+import
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
+import javax.annotation.Nullable;
+
@Slf4j
@Service
@Transactional(propagation = Propagation.SUPPORTS, readOnly = true,
rollbackFor = Exception.class)
public class SavePointServiceImpl extends ServiceImpl<SavePointMapper,
SavePoint>
implements SavePointService {
+ private static final ObjectMapper objMapper =
RestMapperUtils.getStrictObjectMapper();
Review Comment:
updated.
##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java:
##########
@@ -108,6 +112,23 @@ public void process(Long appId, CheckPoints checkPoints) {
}
}
+ private boolean shouldProcessFailedTrigger(
+ CheckPoints.CheckPoint checkPoint, Application application,
CheckPointStatus status) {
+ return CheckPointStatus.FAILED.equals(status)
+ && !checkPoint.getIsSavepoint()
+ && application.cpFailedTrigger();
+ }
+
+ private boolean checkpointNeedStore(
+ Application application, @Nonnull CheckPoints.CheckPoint checkPoint) {
+ LambdaQueryWrapper<SavePoint> queryWrapper =
+ new LambdaQueryWrapper<SavePoint>()
+ .eq(SavePoint::getAppId, application.getAppId())
+ .eq(SavePoint::getJobId, application.getJobId())
Review Comment:
updated.
##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java:
##########
@@ -114,6 +127,42 @@ public SavePoint getLatest(Long id) {
return this.getOne(queryWrapper);
}
+ @Override
+ public Boolean triggerSavepoint(Long appId, @Nullable String savepointPath) {
+ log.info("Start to trigger savepoint for app {}", appId);
+ Application app = appService.getById(appId);
+ AssertUtils.notNull(app, String.format("The application %s doesn't
exist.", appId));
+ String trackingUrl = app.getJobManagerUrl();
+ AssertUtils.state(
+ StringUtils.isNotEmpty(trackingUrl),
+ String.format("The flink trackingUrl for app[%s] isn't available.",
appId));
+ String jobId = app.getJobId();
+ AssertUtils.state(
+ StringUtils.isNotEmpty(jobId),
+ String.format("The jobId of application[%s] is absent.", appId));
+ String triggerId = triggerSavepoint(trackingUrl, jobId, savepointPath);
+ log.info("Request savepoint successful in triggerId {}", triggerId);
+ return true;
Review Comment:
updated.
##########
streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql:
##########
@@ -210,12 +210,19 @@ drop table if exists `t_flink_savepoint`;
create table `t_flink_savepoint` (
`id` bigint not null auto_increment,
`app_id` bigint not null,
+ `job_id` VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
+ `status` VARCHAR(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT
'COMPLETED',
`chk_id` bigint default null,
`type` tinyint default null,
- `path` varchar(255) collate utf8mb4_general_ci default null,
+ `path` varchar(2048) collate utf8mb4_general_ci default null,
`latest` tinyint not null default 1,
`trigger_time` datetime default null,
+ `execution_mode` VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
Review Comment:
removed
##########
streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.0.sql:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- ISSUE-2192 DDL & DML Start
+
+alter table `t_flink_savepoint` modify column `path` varchar(1024) collate
utf8mb4_general_ci default null;
+
+insert into `t_menu` values (100070, 100015, 'savepoint trigger', null, null,
'savepoint:trigger', null, '1', 1, null, now(), now());
Review Comment:
@1996fanrui sorry for the missing according to followed the 2.0.0.sql
status. I'll add it.
##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -1267,8 +1268,14 @@ public void cancel(Application appParam) throws
Exception {
savePoint.setAppId(application.getId());
savePoint.setLatest(true);
savePoint.setType(CheckPointType.SAVEPOINT.get());
- savePoint.setTriggerTime(now);
+ savePoint.setTriggerTime(triggerTime);
savePoint.setCreateTime(now);
+ savePoint.setJobId(cancelRequest.jobId());
+ savePoint.setEndTime(now);
+ savePoint.setClusterId(application.getFlinkClusterId());
+ if (appParam.getExecutionMode() != null) {
+
savePoint.setExecutionMode(ExecutionMode.of(appParam.getExecutionMode()).name());
Review Comment:
removed the field.
##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -1258,7 +1259,7 @@ public void cancel(Application appParam) throws Exception
{
savePoint.setAppId(application.getId());
savePoint.setLatest(true);
savePoint.setType(CheckPointType.SAVEPOINT.get());
- savePoint.setTriggerTime(now);
Review Comment:
thx~
@1996fanrui did you mean that the `Date now = new Date()` should be inlined
into `savePoint.setCreateTime(new Date())` ?
##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java:
##########
@@ -114,6 +127,41 @@ public SavePoint getLatest(Long id) {
return this.getOne(queryWrapper);
}
+ @Override
+ public void trigger(Long appId, @Nullable String savepointPath) {
+ log.info("Start to trigger savepoint for app {}", appId);
+ Application app = appService.getById(appId);
+ AssertUtils.notNull(app, String.format("The application %s doesn't
exist.", appId));
+ String trackingUrl = app.getJobManagerUrl();
+ AssertUtils.state(
+ StringUtils.isNotEmpty(trackingUrl),
+ String.format("The flink trackingUrl for app[%s] isn't available.",
appId));
+ String jobId = app.getJobId();
+ AssertUtils.state(
+ StringUtils.isNotEmpty(jobId),
+ String.format("The jobId of application[%s] is absent.", appId));
+ String triggerId = triggerSavepoint(trackingUrl, jobId, savepointPath);
+ log.info("Request savepoint successful in triggerId {}", triggerId);
+ }
+
+ private String triggerSavepoint(
+ String trackingUrl, String jobId, @Nullable String savepointPath) {
+ String response = null;
+ try {
+ String url = String.format("%s/jobs/%s/savepoints", trackingUrl, jobId);
+ String postBody =
+ OBJECT_MAPPER.writeValueAsString(new
SavepointTriggerRequestBody(savepointPath, false));
+ response = HttpClientUtils.httpPostRequest(url, postBody);
+ TriggerResponse triggerResponse = OBJECT_MAPPER.readValue(response,
TriggerResponse.class);
+ return triggerResponse.getTriggerId().toString();
+ } catch (Exception e) {
+ throw new RuntimeException(
Review Comment:
@1996fanrui @wolfboys Should it be `ApiAlertException` ?
But the current coding is `AlertException` instead of `ApiAlertException`.
##########
streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql:
##########
@@ -210,12 +210,19 @@ drop table if exists `t_flink_savepoint`;
create table `t_flink_savepoint` (
`id` bigint not null auto_increment,
`app_id` bigint not null,
+ `job_id` VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
+ `status` VARCHAR(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT
'COMPLETED',
Review Comment:
removed
##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java:
##########
@@ -108,6 +112,23 @@ public void process(Long appId, CheckPoints checkPoints) {
}
}
+ private boolean shouldProcessFailedTrigger(
+ CheckPoints.CheckPoint checkPoint, Application application,
CheckPointStatus status) {
+ return CheckPointStatus.FAILED.equals(status)
+ && !checkPoint.getIsSavepoint()
+ && application.cpFailedTrigger();
+ }
+
+ private boolean checkpointNeedStore(
+ Application application, @Nonnull CheckPoints.CheckPoint checkPoint) {
+ LambdaQueryWrapper<SavePoint> queryWrapper =
Review Comment:
Thanks for the clarify.
Updated.
##########
streampark-console/streampark-console-service/src/main/assembly/script/upgrade/ISSUE-2192/schema/2192-mysql.sql:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+alter table `t_flink_savepoint` add column `job_id` VARCHAR(255) COLLATE
utf8mb4_unicode_ci DEFAULT NULL after `app_id`;
Review Comment:
removed it into 2.1.0.sql
##########
streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql:
##########
@@ -210,12 +210,19 @@ drop table if exists `t_flink_savepoint`;
create table `t_flink_savepoint` (
`id` bigint not null auto_increment,
`app_id` bigint not null,
+ `job_id` VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
+ `status` VARCHAR(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT
'COMPLETED',
`chk_id` bigint default null,
`type` tinyint default null,
- `path` varchar(255) collate utf8mb4_general_ci default null,
+ `path` varchar(2048) collate utf8mb4_general_ci default null,
`latest` tinyint not null default 1,
`trigger_time` datetime default null,
+ `execution_mode` VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
+ `cluster_id` bigint DEFAULT NULL,
Review Comment:
removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]