This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch lost in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 375ef6f0df32e2f3d9232d55684726881af29821 Author: benjobs <[email protected]> AuthorDate: Wed Feb 7 18:22:32 2024 +0800 [Improve] job state lost improvement --- .../apache/streampark/common/util/DateUtils.scala | 11 ++++++++-- .../console/core/bean/AlertTemplate.java | 4 ++-- .../console/core/entity/Application.java | 5 ++--- .../core/service/impl/ApplicationServiceImpl.java | 4 ++-- .../console/core/task/FlinkAppHttpWatcher.java | 24 ++++++++++++++++++---- .../core/task/FlinkK8sChangeEventListener.java | 4 ++-- .../core/service/alert/AlertServiceTest.java | 4 ++-- 7 files changed, 39 insertions(+), 17 deletions(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala index 8c62eebe1..96a282513 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala @@ -17,7 +17,7 @@ package org.apache.streampark.common.util import java.text.{ParseException, SimpleDateFormat} -import java.time.{Duration, LocalDateTime} +import java.time.{Duration, LocalDateTime, ZoneId} import java.time.format.DateTimeFormatter import java.util._ import java.util.concurrent.TimeUnit @@ -154,7 +154,7 @@ object DateUtils { * @param milliseconds * @return */ - def toDuration(milliseconds: Long): String = { + def toStringDuration(milliseconds: Long): String = { val duration = Duration.ofMillis(milliseconds) val days = duration.toDays @@ -175,6 +175,13 @@ object DateUtils { builder.toString } + def toSecondDuration(time1: Date, time2: Date = new Date()): Long = { + val startDateTime = LocalDateTime.ofInstant(time1.toInstant, ZoneId.systemDefault()); + val endDateTime = LocalDateTime.ofInstant(time2.toInstant, ZoneId.systemDefault()); + val duration = Duration.between(startDateTime, endDateTime) + duration.toMillis / 1000 + } + def getTimeUnit( time: String, default: (Int, TimeUnit) = (5, TimeUnit.SECONDS)): (Int, TimeUnit) = { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java index b102c0e11..8dd5b4d7f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java @@ -75,7 +75,7 @@ public class AlertTemplate implements Serializable { template.setEndTime( DateUtils.format( application.getEndTime(), DateUtils.fullFormat(), TimeZone.getDefault())); - template.setDuration(DateUtils.toDuration(duration)); + template.setDuration(DateUtils.toStringDuration(duration)); } else { template.setStartTime("-"); template.setEndTime("-"); @@ -113,7 +113,7 @@ public class AlertTemplate implements Serializable { AlertTemplate template = of(application); template.setType(2); template.setCpFailureRateInterval( - DateUtils.toDuration(application.getCpFailureRateInterval() * 1000 * 60)); + DateUtils.toStringDuration(application.getCpFailureRateInterval() * 1000 * 60)); template.setCpMaxFailureInterval(application.getCpMaxFailureInterval()); template.setTitle(String.format("Notify: %s checkpoint FAILED", application.getJobName())); template.setSubject( diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java index ab2f69d33..0808afd57 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java @@ -326,8 +326,7 @@ public class Application implements Serializable { * @return true: can start | false: can not start. */ public boolean isCanBeStart() { - FlinkAppState state = FlinkAppState.of(getState()); - switch (state) { + switch (getFlinkAppStateEnum()) { case ADDED: case CREATED: case FAILED: @@ -345,7 +344,7 @@ public class Application implements Serializable { } public boolean shouldBeTrack() { - return shouldTracking(FlinkAppState.of(getState())) == 1; + return shouldTracking(getFlinkAppStateEnum()) == 1; } @JsonIgnore diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 8360f7439..2dd1681db 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -453,7 +453,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli @Override public boolean checkAlter(Application application) { Long appId = application.getId(); - FlinkAppState state = FlinkAppState.of(application.getState()); + FlinkAppState state = application.getFlinkAppStateEnum(); if (!FlinkAppState.CANCELED.equals(state)) { return false; } @@ -700,7 +700,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli return AppExistsState.IN_DB; } - FlinkAppState state = FlinkAppState.of(app.getState()); + FlinkAppState state = app.getFlinkAppStateEnum(); // has stopped status if (state.equals(FlinkAppState.ADDED) || state.equals(FlinkAppState.CREATED) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java index 309f573fa..63f5c18c0 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java @@ -18,6 +18,7 @@ package org.apache.streampark.console.core.task; import org.apache.streampark.common.enums.ExecutionMode; +import org.apache.streampark.common.util.DateUtils; import org.apache.streampark.common.util.HttpClientUtils; import org.apache.streampark.common.util.ThreadUtils; import org.apache.streampark.common.util.YarnUtils; @@ -106,6 +107,9 @@ public class FlinkAppHttpWatcher { private static final Cache<Long, Byte> STARTING_CACHE = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build(); + private static final Cache<Long, Date> LOST_CACHE = + Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build(); + /** tracking task list */ private static final Map<Long, Application> WATCHING_APPS = new ConcurrentHashMap<>(0); @@ -205,10 +209,12 @@ public class FlinkAppHttpWatcher { try { // query status from flink rest api getFromFlinkRestApi(application); + cleanupLost(application); } catch (Exception flinkException) { // query status from yarn rest api try { getFromYarnRestApi(application); + cleanupLost(application); } catch (Exception yarnException) { doStateFailed(application); } @@ -217,6 +223,10 @@ public class FlinkAppHttpWatcher { } } + private void cleanupLost(Application application) { + LOST_CACHE.invalidate(application.getId()); + } + private void doStateFailed(Application application) { /* Query from flink's restAPI and yarn's restAPI both failed. @@ -230,8 +240,14 @@ public class FlinkAppHttpWatcher { log.error( "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi and getFromYarnRestApi error,job failed,savePoint expired!"); if (StopFrom.NONE.equals(stopFrom)) { - savePointService.expire(application.getId()); - application.setState(FlinkAppState.LOST.getValue()); + Date lostTime = LOST_CACHE.getIfPresent(application.getId()); + if (lostTime == null) { + LOST_CACHE.put(application.getId(), new Date()); + } else if (DateUtils.toSecondDuration(lostTime, new Date()) >= 30) { + savePointService.expire(application.getId()); + application.setState(FlinkAppState.LOST.getValue()); + LOST_CACHE.invalidate(application.getId()); + } } else { application.setState(FlinkAppState.CANCELED.getValue()); } @@ -245,9 +261,9 @@ public class FlinkAppHttpWatcher { cleanSavepoint(application); cleanOptioning(optionState, application.getId()); doPersistMetrics(application, true); - FlinkAppState appState = FlinkAppState.of(application.getState()); + FlinkAppState appState = application.getFlinkAppStateEnum(); if (appState.equals(FlinkAppState.FAILED) || appState.equals(FlinkAppState.LOST)) { - alertService.alert(application, FlinkAppState.of(application.getState())); + alertService.alert(application, application.getFlinkAppStateEnum()); if (appState.equals(FlinkAppState.FAILED)) { try { applicationService.start(application, true); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java index 55053f1f0..b0f3f5f14 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java @@ -79,7 +79,7 @@ public class FlinkK8sChangeEventListener { setByJobStatusCV(app, jobStatus); applicationService.persistMetrics(app); - FlinkAppState state = FlinkAppState.of(app.getState()); + FlinkAppState state = app.getFlinkAppStateEnum(); // email alerts when necessary if (FlinkAppState.FAILED.equals(state) || FlinkAppState.LOST.equals(state) @@ -143,7 +143,7 @@ public class FlinkK8sChangeEventListener { // infer the final flink job state Enumeration.Value state = FlinkJobStatusWatcher.inferFlinkJobStateFromPersist( - jobStatus.jobState(), toK8sFlinkJobState(FlinkAppState.of(app.getState()))); + jobStatus.jobState(), toK8sFlinkJobState(app.getFlinkAppStateEnum())); // corrective start-time / end-time / duration long preStartTime = app.getStartTime() != null ? app.getStartTime().getTime() : 0; diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java index 77a2f3df6..4bc4762cd 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java @@ -212,7 +212,7 @@ class AlertServiceTest { template.setStartTime( DateUtils.format( application.getStartTime(), DateUtils.fullFormat(), TimeZone.getDefault())); - template.setDuration(DateUtils.toDuration(duration)); + template.setDuration(DateUtils.toStringDuration(duration)); template.setLink(url); template.setEndTime( DateUtils.format( @@ -223,7 +223,7 @@ class AlertServiceTest { template.setRestartIndex(application.getRestartCount()); template.setTotalRestart(application.getRestartSize()); template.setCpFailureRateInterval( - DateUtils.toDuration(application.getCpFailureRateInterval() * 1000 * 60)); + DateUtils.toStringDuration(application.getCpFailureRateInterval() * 1000 * 60)); template.setCpMaxFailureInterval(application.getCpMaxFailureInterval()); return template;
