This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new 1e0787b8a [Improve] job state lost improvement (#3539)
1e0787b8a is described below
commit 1e0787b8aa414f217bdbce28cc3475004cef3bda
Author: benjobs <[email protected]>
AuthorDate: Wed Feb 7 18:34:51 2024 +0800
[Improve] job state lost improvement (#3539)
Co-authored-by: benjobs <[email protected]>
---
.../apache/streampark/common/util/DateUtils.scala | 11 ++++++++--
.../console/core/bean/AlertTemplate.java | 4 ++--
.../console/core/entity/Application.java | 5 ++---
.../core/service/impl/ApplicationServiceImpl.java | 4 ++--
.../console/core/task/FlinkAppHttpWatcher.java | 24 ++++++++++++++++++----
.../core/task/FlinkK8sChangeEventListener.java | 4 ++--
.../core/service/alert/AlertServiceTest.java | 4 ++--
7 files changed, 39 insertions(+), 17 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
index 8c62eebe1..96a282513 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.common.util
import java.text.{ParseException, SimpleDateFormat}
-import java.time.{Duration, LocalDateTime}
+import java.time.{Duration, LocalDateTime, ZoneId}
import java.time.format.DateTimeFormatter
import java.util._
import java.util.concurrent.TimeUnit
@@ -154,7 +154,7 @@ object DateUtils {
* @param milliseconds
* @return
*/
- def toDuration(milliseconds: Long): String = {
+ def toStringDuration(milliseconds: Long): String = {
val duration = Duration.ofMillis(milliseconds)
val days = duration.toDays
@@ -175,6 +175,13 @@ object DateUtils {
builder.toString
}
+ def toSecondDuration(time1: Date, time2: Date = new Date()): Long = {
+ val startDateTime = LocalDateTime.ofInstant(time1.toInstant,
ZoneId.systemDefault());
+ val endDateTime = LocalDateTime.ofInstant(time2.toInstant,
ZoneId.systemDefault());
+ val duration = Duration.between(startDateTime, endDateTime)
+ duration.toMillis / 1000
+ }
+
def getTimeUnit(
time: String,
default: (Int, TimeUnit) = (5, TimeUnit.SECONDS)): (Int, TimeUnit) = {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
index b102c0e11..8dd5b4d7f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
@@ -75,7 +75,7 @@ public class AlertTemplate implements Serializable {
template.setEndTime(
DateUtils.format(
application.getEndTime(), DateUtils.fullFormat(),
TimeZone.getDefault()));
- template.setDuration(DateUtils.toDuration(duration));
+ template.setDuration(DateUtils.toStringDuration(duration));
} else {
template.setStartTime("-");
template.setEndTime("-");
@@ -113,7 +113,7 @@ public class AlertTemplate implements Serializable {
AlertTemplate template = of(application);
template.setType(2);
template.setCpFailureRateInterval(
- DateUtils.toDuration(application.getCpFailureRateInterval() * 1000 *
60));
+ DateUtils.toStringDuration(application.getCpFailureRateInterval() *
1000 * 60));
template.setCpMaxFailureInterval(application.getCpMaxFailureInterval());
template.setTitle(String.format("Notify: %s checkpoint FAILED",
application.getJobName()));
template.setSubject(
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index ab2f69d33..0808afd57 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -326,8 +326,7 @@ public class Application implements Serializable {
* @return true: can start | false: can not start.
*/
public boolean isCanBeStart() {
- FlinkAppState state = FlinkAppState.of(getState());
- switch (state) {
+ switch (getFlinkAppStateEnum()) {
case ADDED:
case CREATED:
case FAILED:
@@ -345,7 +344,7 @@ public class Application implements Serializable {
}
public boolean shouldBeTrack() {
- return shouldTracking(FlinkAppState.of(getState())) == 1;
+ return shouldTracking(getFlinkAppStateEnum()) == 1;
}
@JsonIgnore
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 8360f7439..2dd1681db 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -453,7 +453,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
@Override
public boolean checkAlter(Application application) {
Long appId = application.getId();
- FlinkAppState state = FlinkAppState.of(application.getState());
+ FlinkAppState state = application.getFlinkAppStateEnum();
if (!FlinkAppState.CANCELED.equals(state)) {
return false;
}
@@ -700,7 +700,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
return AppExistsState.IN_DB;
}
- FlinkAppState state = FlinkAppState.of(app.getState());
+ FlinkAppState state = app.getFlinkAppStateEnum();
// has stopped status
if (state.equals(FlinkAppState.ADDED)
|| state.equals(FlinkAppState.CREATED)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index 309f573fa..63f5c18c0 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -18,6 +18,7 @@
package org.apache.streampark.console.core.task;
import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.DateUtils;
import org.apache.streampark.common.util.HttpClientUtils;
import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.YarnUtils;
@@ -106,6 +107,9 @@ public class FlinkAppHttpWatcher {
private static final Cache<Long, Byte> STARTING_CACHE =
Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
+ private static final Cache<Long, Date> LOST_CACHE =
+ Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
+
/** tracking task list */
private static final Map<Long, Application> WATCHING_APPS = new
ConcurrentHashMap<>(0);
@@ -205,10 +209,12 @@ public class FlinkAppHttpWatcher {
try {
// query status from flink rest api
getFromFlinkRestApi(application);
+ cleanupLost(application);
} catch (Exception flinkException) {
// query status from yarn rest api
try {
getFromYarnRestApi(application);
+ cleanupLost(application);
} catch (Exception yarnException) {
doStateFailed(application);
}
@@ -217,6 +223,10 @@ public class FlinkAppHttpWatcher {
}
}
+ private void cleanupLost(Application application) {
+ LOST_CACHE.invalidate(application.getId());
+ }
+
private void doStateFailed(Application application) {
/*
Query from flink's restAPI and yarn's restAPI both failed.
@@ -230,8 +240,14 @@ public class FlinkAppHttpWatcher {
log.error(
"[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi and
getFromYarnRestApi error,job failed,savePoint expired!");
if (StopFrom.NONE.equals(stopFrom)) {
- savePointService.expire(application.getId());
- application.setState(FlinkAppState.LOST.getValue());
+ Date lostTime = LOST_CACHE.getIfPresent(application.getId());
+ if (lostTime == null) {
+ LOST_CACHE.put(application.getId(), new Date());
+ } else if (DateUtils.toSecondDuration(lostTime, new Date()) >= 30) {
+ savePointService.expire(application.getId());
+ application.setState(FlinkAppState.LOST.getValue());
+ LOST_CACHE.invalidate(application.getId());
+ }
} else {
application.setState(FlinkAppState.CANCELED.getValue());
}
@@ -245,9 +261,9 @@ public class FlinkAppHttpWatcher {
cleanSavepoint(application);
cleanOptioning(optionState, application.getId());
doPersistMetrics(application, true);
- FlinkAppState appState = FlinkAppState.of(application.getState());
+ FlinkAppState appState = application.getFlinkAppStateEnum();
if (appState.equals(FlinkAppState.FAILED) ||
appState.equals(FlinkAppState.LOST)) {
- alertService.alert(application,
FlinkAppState.of(application.getState()));
+ alertService.alert(application, application.getFlinkAppStateEnum());
if (appState.equals(FlinkAppState.FAILED)) {
try {
applicationService.start(application, true);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index 55053f1f0..b0f3f5f14 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -79,7 +79,7 @@ public class FlinkK8sChangeEventListener {
setByJobStatusCV(app, jobStatus);
applicationService.persistMetrics(app);
- FlinkAppState state = FlinkAppState.of(app.getState());
+ FlinkAppState state = app.getFlinkAppStateEnum();
// email alerts when necessary
if (FlinkAppState.FAILED.equals(state)
|| FlinkAppState.LOST.equals(state)
@@ -143,7 +143,7 @@ public class FlinkK8sChangeEventListener {
// infer the final flink job state
Enumeration.Value state =
FlinkJobStatusWatcher.inferFlinkJobStateFromPersist(
- jobStatus.jobState(),
toK8sFlinkJobState(FlinkAppState.of(app.getState())));
+ jobStatus.jobState(),
toK8sFlinkJobState(app.getFlinkAppStateEnum()));
// corrective start-time / end-time / duration
long preStartTime = app.getStartTime() != null ?
app.getStartTime().getTime() : 0;
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
index 77a2f3df6..4bc4762cd 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
@@ -212,7 +212,7 @@ class AlertServiceTest {
template.setStartTime(
DateUtils.format(
application.getStartTime(), DateUtils.fullFormat(),
TimeZone.getDefault()));
- template.setDuration(DateUtils.toDuration(duration));
+ template.setDuration(DateUtils.toStringDuration(duration));
template.setLink(url);
template.setEndTime(
DateUtils.format(
@@ -223,7 +223,7 @@ class AlertServiceTest {
template.setRestartIndex(application.getRestartCount());
template.setTotalRestart(application.getRestartSize());
template.setCpFailureRateInterval(
- DateUtils.toDuration(application.getCpFailureRateInterval() * 1000 *
60));
+ DateUtils.toStringDuration(application.getCpFailureRateInterval() *
1000 * 60));
template.setCpMaxFailureInterval(application.getCpMaxFailureInterval());
return template;