This is an automated email from the ASF dual-hosted git repository.
zhouli pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 8d5a4de18 [Bug] remote mode job remapping status bug fixed (#2819)
8d5a4de18 is described below
commit 8d5a4de187523b8dfcbc4dcb927210c2c2936bff
Author: benjobs <[email protected]>
AuthorDate: Fri Jun 30 09:39:59 2023 +0800
[Bug] remote mode job remapping status bug fixed (#2819)
* [Bug] remote mode remapping bug fixed
* minor improvement
---------
Co-authored-by: benjobs <[email protected]>
---
.../apache/streampark/common/util/YarnUtils.scala | 3 +-
.../core/service/impl/FlinkClusterServiceImpl.java | 2 -
.../console/core/task/FlinkRESTAPIWatcher.java | 221 +++++++++++----------
3 files changed, 114 insertions(+), 112 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index cdf973452..2c58dda15 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -34,6 +34,7 @@ import java.util.{HashMap => JavaHashMap, List => JavaList}
import java.util.concurrent.TimeUnit
import scala.collection.convert.ImplicitConversions._
+import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
import scala.util.control.Breaks.{break, breakable}
@@ -204,7 +205,7 @@ object YarnUtils extends Logger {
val address = NetUtils.getConnectAddress(inetSocketAddress)
- val buffer = new StringBuilder(protocol)
+ val buffer = new mutable.StringBuilder(protocol)
val resolved = address.getAddress
if (resolved != null && !resolved.isAnyLocalAddress &&
!resolved.isLoopbackAddress) {
buffer.append(address.getHostName)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index cb7e4ce76..31d4ead6f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -32,7 +32,6 @@ import
org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.YarnQueueService;
import org.apache.streampark.console.core.task.FlinkClusterWatcher;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.DeployRequest;
import org.apache.streampark.flink.client.bean.DeployResponse;
@@ -175,7 +174,6 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
flinkCluster.setException(null);
FlinkClusterWatcher.addFlinkCluster(flinkCluster);
updateById(flinkCluster);
- FlinkRESTAPIWatcher.removeFlinkCluster(flinkCluster);
} catch (Exception e) {
log.error(e.getMessage(), e);
flinkCluster.setAddress(null);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
index d32085b3c..e9b9f2224 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
@@ -196,66 +196,67 @@ public class FlinkRESTAPIWatcher {
private void doWatch() {
lastWatchingTime = System.currentTimeMillis();
for (Map.Entry<Long, Application> entry : WATCHING_APPS.entrySet()) {
- EXECUTOR.execute(
- () -> {
- long key = entry.getKey();
- Application application = entry.getValue();
- final StopFrom stopFrom =
- STOP_FROM_MAP.getOrDefault(key, null) == null
- ? StopFrom.NONE
- : STOP_FROM_MAP.get(key);
- final OptionState optionState = OPTIONING.get(key);
+ watch(entry.getKey(), entry.getValue());
+ }
+ }
+
+ private void watch(Long key, Application application) {
+ EXECUTOR.execute(
+ () -> {
+ final StopFrom stopFrom =
+ STOP_FROM_MAP.getOrDefault(key, null) == null
+ ? StopFrom.NONE
+ : STOP_FROM_MAP.get(key);
+ final OptionState optionState = OPTIONING.get(key);
+ try {
+ // query status from flink rest api
+ getFromFlinkRestApi(application, stopFrom);
+ } catch (Exception flinkException) {
+ // query status from yarn rest api
try {
- // query status from flink rest api
- getFromFlinkRestApi(application, stopFrom);
- } catch (Exception flinkException) {
- // query status from yarn rest api
- try {
- getFromYarnRestApi(application, stopFrom);
- } catch (Exception yarnException) {
+ getFromYarnRestApi(application, stopFrom);
+ } catch (Exception yarnException) {
+ /*
+ Query from flink's restAPI and yarn's restAPI both failed.
+ In this case, it is necessary to decide whether to return to
the final state depending on the state being operated
+ */
+ if (optionState == null ||
!optionState.equals(OptionState.STARTING)) {
+ // non-mapping
+ if (application.getState() !=
FlinkAppState.MAPPING.getValue()) {
+ log.error(
+ "FlinkRESTAPIWatcher getFromFlinkRestApi and
getFromYarnRestApi error,job failed,savePoint expired!");
+ if (StopFrom.NONE.equals(stopFrom)) {
+ savePointService.expire(application.getId());
+ application.setState(FlinkAppState.LOST.getValue());
+ alertService.alert(application, FlinkAppState.LOST);
+ } else {
+ application.setState(FlinkAppState.CANCELED.getValue());
+ }
+ }
/*
- Query from flink's restAPI and yarn's restAPI both failed.
- In this case, it is necessary to decide whether to return to
the final state depending on the state being operated
+ This step means that the above two ways to get information
have failed, and this step is the last step,
+ which will directly identify the mission as cancelled or lost.
+ Need clean savepoint.
*/
- if (optionState == null ||
!optionState.equals(OptionState.STARTING)) {
- // non-mapping
- if (application.getState() !=
FlinkAppState.MAPPING.getValue()) {
- log.error(
- "FlinkRESTAPIWatcher getFromFlinkRestApi and
getFromYarnRestApi error,job failed,savePoint expired!");
- if (StopFrom.NONE.equals(stopFrom)) {
- savePointService.expire(application.getId());
- application.setState(FlinkAppState.LOST.getValue());
- alertService.alert(application, FlinkAppState.LOST);
- } else {
- application.setState(FlinkAppState.CANCELED.getValue());
- }
- }
- /*
- This step means that the above two ways to get information
have failed, and this step is the last step,
- which will directly identify the mission as cancelled or
lost.
- Need clean savepoint.
- */
- application.setEndTime(new Date());
- cleanSavepoint(application);
- cleanOptioning(optionState, key);
- doPersistMetrics(application, true);
- FlinkAppState appState =
FlinkAppState.of(application.getState());
- if (appState.equals(FlinkAppState.FAILED)
- || appState.equals(FlinkAppState.LOST)) {
- alertService.alert(application,
FlinkAppState.of(application.getState()));
- if (appState.equals(FlinkAppState.FAILED)) {
- try {
- applicationService.start(application, true);
- } catch (Exception e) {
- log.error(e.getMessage(), e);
- }
+ application.setEndTime(new Date());
+ cleanSavepoint(application);
+ cleanOptioning(optionState, key);
+ doPersistMetrics(application, true);
+ FlinkAppState appState =
FlinkAppState.of(application.getState());
+ if (appState.equals(FlinkAppState.FAILED) ||
appState.equals(FlinkAppState.LOST)) {
+ alertService.alert(application,
FlinkAppState.of(application.getState()));
+ if (appState.equals(FlinkAppState.FAILED)) {
+ try {
+ applicationService.start(application, true);
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
}
}
}
}
}
- });
- }
+ }
+ });
}
/**
@@ -265,8 +266,7 @@ public class FlinkRESTAPIWatcher {
* @param stopFrom stopFrom
*/
private void getFromFlinkRestApi(Application application, StopFrom stopFrom)
throws Exception {
- FlinkCluster flinkCluster = getFlinkCluster(application);
- JobsOverview jobsOverview = httpJobsOverview(application, flinkCluster);
+ JobsOverview jobsOverview = httpJobsOverview(application);
Optional<JobsOverview.Job> optional;
ExecutionMode execMode = application.getExecutionModeEnum();
if (ExecutionMode.YARN_APPLICATION.equals(execMode)
@@ -339,8 +339,7 @@ public class FlinkRESTAPIWatcher {
// get overview info at the first start time
if (STARTING_CACHE.getIfPresent(application.getId()) != null) {
- FlinkCluster flinkCluster = getFlinkCluster(application);
- Overview override = httpOverview(application, flinkCluster);
+ Overview override = httpOverview(application);
if (override != null && override.getSlotsTotal() > 0) {
application.setTotalTM(override.getTaskmanagers());
application.setTotalSlot(override.getSlotsTotal());
@@ -352,8 +351,7 @@ public class FlinkRESTAPIWatcher {
/** get latest checkpoint */
private void handleCheckPoints(Application application) throws Exception {
- FlinkCluster flinkCluster = getFlinkCluster(application);
- CheckPoints checkPoints = httpCheckpoints(application, flinkCluster);
+ CheckPoints checkPoints = httpCheckpoints(application);
if (checkPoints != null) {
checkpointProcessor.process(application, checkPoints);
}
@@ -602,13 +600,6 @@ public class FlinkRESTAPIWatcher {
SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE);
}
- public static void removeFlinkCluster(FlinkCluster flinkCluster) {
- if (FLINK_CLUSTER_MAP.containsKey(flinkCluster.getId())) {
- log.info("remove flink cluster:{}", flinkCluster.getId());
- FLINK_CLUSTER_MAP.remove(flinkCluster.getId());
- }
- }
-
public static void unWatching(Long appId) {
if (isKubernetesApp(appId)) {
return;
@@ -647,26 +638,12 @@ public class FlinkRESTAPIWatcher {
return FlinkK8sWatcherWrapper.isKubernetesApp(app);
}
- private FlinkCluster getFlinkCluster(Application application) {
- if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())
- || ExecutionMode.isSessionMode(application.getExecutionModeEnum())) {
- FlinkCluster flinkCluster =
FLINK_CLUSTER_MAP.get(application.getFlinkClusterId());
- if (flinkCluster == null) {
- flinkCluster =
flinkClusterService.getById(application.getFlinkClusterId());
- FLINK_CLUSTER_MAP.put(application.getFlinkClusterId(), flinkCluster);
- }
- return flinkCluster;
- }
- return null;
- }
-
private YarnAppInfo httpYarnAppInfo(Application application) throws
Exception {
String reqURL = "ws/v1/cluster/apps/".concat(application.getAppId());
return yarnRestRequest(reqURL, YarnAppInfo.class);
}
- private Overview httpOverview(Application application, FlinkCluster
flinkCluster)
- throws IOException {
+ private Overview httpOverview(Application application) throws IOException {
String appId = application.getAppId();
if (appId != null) {
if
(application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)
@@ -685,12 +662,10 @@ public class FlinkRESTAPIWatcher {
return null;
}
- private JobsOverview httpJobsOverview(Application application, FlinkCluster
flinkCluster)
- throws Exception {
+ private JobsOverview httpJobsOverview(Application application) throws
Exception {
final String flinkUrl = "jobs/overview";
ExecutionMode execMode = application.getExecutionModeEnum();
- if (ExecutionMode.YARN_PER_JOB.equals(execMode)
- || ExecutionMode.YARN_APPLICATION.equals(execMode)) {
+ if (ExecutionMode.isYarnMode(execMode)) {
String reqURL;
if (StringUtils.isEmpty(application.getJobManagerUrl())) {
String format = "proxy/%s/" + flinkUrl;
@@ -700,30 +675,31 @@ public class FlinkRESTAPIWatcher {
reqURL = String.format(format, application.getJobManagerUrl());
}
return yarnRestRequest(reqURL, JobsOverview.class);
- } else if (ExecutionMode.REMOTE.equals(execMode)
- || ExecutionMode.YARN_SESSION.equals(execMode)) {
- if (application.getJobId() != null) {
- String remoteUrl = flinkCluster.getAddress() + "/" + flinkUrl;
- JobsOverview jobsOverview = httpRestRequest(remoteUrl,
JobsOverview.class);
- if (jobsOverview != null) {
- List<JobsOverview.Job> jobs =
- jobsOverview.getJobs().stream()
- .filter(x -> x.getId().equals(application.getJobId()))
- .collect(Collectors.toList());
- jobsOverview.setJobs(jobs);
- }
- return jobsOverview;
- }
+ }
+
+ if (application.getJobId() != null &&
ExecutionMode.isRemoteMode(execMode)) {
+ return httpRemoteCluster(
+ application.getFlinkClusterId(),
+ cluster -> {
+ String remoteUrl = cluster.getAddress() + "/" + flinkUrl;
+ JobsOverview jobsOverview = httpRestRequest(remoteUrl,
JobsOverview.class);
+ if (jobsOverview != null) {
+ List<JobsOverview.Job> jobs =
+ jobsOverview.getJobs().stream()
+ .filter(x -> x.getId().equals(application.getJobId()))
+ .collect(Collectors.toList());
+ jobsOverview.setJobs(jobs);
+ }
+ return jobsOverview;
+ });
}
return null;
}
- private CheckPoints httpCheckpoints(Application application, FlinkCluster
flinkCluster)
- throws IOException {
+ private CheckPoints httpCheckpoints(Application application) throws
Exception {
final String flinkUrl = "jobs/%s/checkpoints";
ExecutionMode execMode = application.getExecutionModeEnum();
- if (ExecutionMode.YARN_PER_JOB.equals(execMode)
- || ExecutionMode.YARN_APPLICATION.equals(execMode)) {
+ if (ExecutionMode.isYarnMode(execMode)) {
String reqURL;
if (StringUtils.isEmpty(application.getJobManagerUrl())) {
String format = "proxy/%s/" + flinkUrl;
@@ -733,13 +709,16 @@ public class FlinkRESTAPIWatcher {
reqURL = String.format(format, application.getJobManagerUrl(),
application.getJobId());
}
return yarnRestRequest(reqURL, CheckPoints.class);
- } else if (ExecutionMode.REMOTE.equals(execMode)
- || ExecutionMode.YARN_SESSION.equals(execMode)) {
- if (application.getJobId() != null) {
- String remoteUrl =
- flinkCluster.getAddress() + "/" + String.format(flinkUrl,
application.getJobId());
- return httpRestRequest(remoteUrl, CheckPoints.class);
- }
+ }
+
+ if (application.getJobId() != null &&
ExecutionMode.isRemoteMode(execMode)) {
+ return httpRemoteCluster(
+ application.getFlinkClusterId(),
+ cluster -> {
+ String remoteUrl =
+ cluster.getAddress() + "/" + String.format(flinkUrl,
application.getJobId());
+ return httpRestRequest(remoteUrl, CheckPoints.class);
+ });
}
return null;
}
@@ -765,4 +744,28 @@ public class FlinkRESTAPIWatcher {
public boolean isWatchingApp(Long id) {
return WATCHING_APPS.containsKey(id);
}
+
+ private <T> T httpRemoteCluster(Long clusterId, Callback<FlinkCluster, T>
function)
+ throws Exception {
+ FlinkCluster flinkCluster = getFlinkRemoteCluster(clusterId, false);
+ try {
+ return function.call(flinkCluster);
+ } catch (Exception e) {
+ flinkCluster = getFlinkRemoteCluster(clusterId, true);
+ return function.call(flinkCluster);
+ }
+ }
+
+ private FlinkCluster getFlinkRemoteCluster(Long clusterId, boolean flush) {
+ FlinkCluster flinkCluster = FLINK_CLUSTER_MAP.get(clusterId);
+ if (flinkCluster == null || flush) {
+ flinkCluster = flinkClusterService.getById(clusterId);
+ FLINK_CLUSTER_MAP.put(clusterId, flinkCluster);
+ }
+ return flinkCluster;
+ }
+
+ interface Callback<T, R> {
+ R call(T e) throws Exception;
+ }
}