This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 13fdb36a4 [Bug] flinkclousterid cannot be saved in yarn-session mode
(#1608)
13fdb36a4 is described below
commit 13fdb36a4130ad6d2f16a0c3c0dc2ba0ba7451b0
Author: xujiangfeng001 <[email protected]>
AuthorDate: Thu Sep 15 14:00:30 2022 +0800
[Bug] flinkclousterid cannot be saved in yarn-session mode (#1608)
* [Bug] flinkclousterid cannot be saved in yarn-session mode
---
.../core/service/impl/ApplicationServiceImpl.java | 12 +++++-----
.../core/service/impl/FlinkClusterServiceImpl.java | 2 ++
.../console/core/task/FlinkTrackingTask.java | 27 +++++++++++++---------
.../src/views/flink/app/Add.vue | 16 +++++++++++--
4 files changed, 38 insertions(+), 19 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 6fb414953..61316b96a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1049,9 +1049,9 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
optionMap.put(ConfigConst.KEY_YARN_APP_QUEUE(), yarnQueue);
}
if
(ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
- String yarnSessionClusterId = (String)
application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_ID());
- assert yarnSessionClusterId != null;
- extraParameter.put(ConfigConst.KEY_YARN_APP_ID(),
yarnSessionClusterId);
+ FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
+ assert cluster != null;
+ extraParameter.put(ConfigConst.KEY_YARN_APP_ID(),
cluster.getClusterId());
}
}
@@ -1284,9 +1284,9 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
dynamicOption.put(ConfigConst.KEY_YARN_APP_QUEUE(), yarnQueue);
}
if
(ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
- String yarnSessionClusterId = (String)
application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_ID());
- assert yarnSessionClusterId != null;
- extraParameter.put(ConfigConst.KEY_YARN_APP_ID(),
yarnSessionClusterId);
+ FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
+ assert cluster != null;
+ extraParameter.put(ConfigConst.KEY_YARN_APP_ID(),
cluster.getClusterId());
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 19f9cc367..648f7c4d3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -31,6 +31,7 @@ import
org.apache.streampark.console.core.service.CommonService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.SettingService;
+import org.apache.streampark.console.core.task.FlinkTrackingTask;
import org.apache.streampark.flink.submit.FlinkSubmitter;
import org.apache.streampark.flink.submit.bean.DeployRequest;
import org.apache.streampark.flink.submit.bean.DeployResponse;
@@ -188,6 +189,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
updateWrapper.set(FlinkCluster::getException, null);
update(updateWrapper);
result.setStatus(1);
+ FlinkTrackingTask.removeFlinkCluster(flinkCluster);
} else {
result.setStatus(0);
result.setMsg("deploy cluster failed," +
deployResponse.message());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
index 796e034b7..95fa66a51 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
@@ -269,9 +269,9 @@ public class FlinkTrackingTask {
FlinkCluster flinkCluster = getFlinkCluster(application);
JobsOverview jobsOverview = httpJobsOverview(application,
flinkCluster);
Optional<JobsOverview.Job> optional;
- if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
- optional = jobsOverview.getJobs().size() > 1 ?
jobsOverview.getJobs().stream().filter(a ->
- StringUtils.equals(application.getJobId(),
a.getId())).findFirst() : jobsOverview.getJobs().stream().findFirst();
+ ExecutionMode execMode = application.getExecutionModeEnum();
+ if (ExecutionMode.YARN_APPLICATION.equals(execMode) ||
ExecutionMode.YARN_PER_JOB.equals(execMode)) {
+ optional = jobsOverview.getJobs().size() > 1 ?
jobsOverview.getJobs().stream().filter(a ->
StringUtils.equals(application.getJobId(), a.getId())).findFirst() :
jobsOverview.getJobs().stream().findFirst();
} else {
optional = jobsOverview.getJobs().stream().filter(x ->
x.getId().equals(application.getJobId())).findFirst();
}
@@ -590,6 +590,13 @@ public class FlinkTrackingTask {
SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE);
}
+ public static void removeFlinkCluster(FlinkCluster flinkCluster) {
+ if (FLINK_CLUSTER_MAP.containsKey(flinkCluster.getId())) {
+ log.info("remove flink cluster:{}", flinkCluster.getId());
+ FLINK_CLUSTER_MAP.remove(flinkCluster.getId());
+ }
+ }
+
/**
* Reload the latest application to the database to avoid the problem of
inconsistency between the data of cache and database.
*
@@ -724,7 +731,8 @@ public class FlinkTrackingTask {
private JobsOverview httpJobsOverview(Application application,
FlinkCluster flinkCluster) throws Exception {
final String flinkUrl = "jobs/overview";
- if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
+ ExecutionMode execMode = application.getExecutionModeEnum();
+ if (ExecutionMode.YARN_PER_JOB.equals(execMode) ||
ExecutionMode.YARN_APPLICATION.equals(execMode)) {
String reqURL;
if (StringUtils.isEmpty(application.getJobManagerUrl())) {
String format = "proxy/%s/" + flinkUrl;
@@ -734,12 +742,8 @@ public class FlinkTrackingTask {
reqURL = String.format(format, application.getJobManagerUrl());
}
JobsOverview jobsOverview = yarnRestRequest(reqURL,
JobsOverview.class);
- if (jobsOverview != null &&
ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
- List<JobsOverview.Job> jobs =
jobsOverview.getJobs().stream().filter(x ->
x.getId().equals(application.getJobId())).collect(Collectors.toList());
- jobsOverview.setJobs(jobs);
- }
return jobsOverview;
- } else if (ExecutionMode.isRemoteMode(application.getExecutionMode()))
{
+ } else if (ExecutionMode.REMOTE.equals(execMode) ||
ExecutionMode.YARN_SESSION.equals(execMode)) {
if (application.getJobId() != null) {
String remoteUrl = flinkCluster.getActiveAddress().toURL() +
"/" + flinkUrl;
JobsOverview jobsOverview = httpRestRequest(remoteUrl,
JobsOverview.class);
@@ -755,7 +759,8 @@ public class FlinkTrackingTask {
private CheckPoints httpCheckpoints(Application application, FlinkCluster
flinkCluster) throws IOException {
final String flinkUrl = "jobs/%s/checkpoints";
- if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
+ ExecutionMode execMode = application.getExecutionModeEnum();
+ if (ExecutionMode.YARN_PER_JOB.equals(execMode) ||
ExecutionMode.YARN_APPLICATION.equals(execMode)) {
String reqURL;
if (StringUtils.isEmpty(application.getJobManagerUrl())) {
String format = "proxy/%s/" + flinkUrl;
@@ -765,7 +770,7 @@ public class FlinkTrackingTask {
reqURL = String.format(format, application.getJobManagerUrl(),
application.getJobId());
}
return yarnRestRequest(reqURL, CheckPoints.class);
- } else if (ExecutionMode.isRemoteMode(application.getExecutionMode()))
{
+ } else if (ExecutionMode.REMOTE.equals(execMode) ||
ExecutionMode.YARN_SESSION.equals(execMode)) {
if (application.getJobId() != null) {
String remoteUrl = flinkCluster.getActiveAddress().toURL() +
"/" + String.format(flinkUrl, application.getJobId());
return httpRestRequest(remoteUrl, CheckPoints.class);
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
index 813615e96..6fae3012f 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
@@ -2298,12 +2298,18 @@
handleSubmitCustomJob(values) {
const options = this.handleFormValue(values)
- if(values.flinkClusterId){
+ if (values.flinkClusterId) {
const cluster = this.flinkClusters.filter(c => c.id ===
values.flinkClusterId && c.clusterState === 1)[0] || null
values.clusterId = cluster.id
values.flinkClusterId = cluster.id
values.yarnSessionClusterId = cluster.clusterId
}
+ if (values.yarnSessionClusterId) {
+ const cluster = this.flinkClusters.filter(c => c.clusterId ===
values.yarnSessionClusterId && c.clusterState === 1)[0] || null
+ values.clusterId = cluster.id
+ values.flinkClusterId = cluster.id
+ values.yarnSessionClusterId = cluster.clusterId
+ }
const params = {
jobType: 1,
executionMode: values.executionMode,
@@ -2395,12 +2401,18 @@
} else {
config = null
}
- if(values.flinkClusterId){
+ if (values.flinkClusterId) {
const cluster = this.flinkClusters.filter(c => c.id ===
values.flinkClusterId && c.clusterState === 1)[0] || null
values.clusterId = cluster.id
values.flinkClusterId = cluster.id
values.yarnSessionClusterId = cluster.clusterId
}
+ if (values.yarnSessionClusterId) {
+ const cluster = this.flinkClusters.filter(c => c.clusterId ===
values.yarnSessionClusterId && c.clusterState === 1)[0] || null
+ values.clusterId = cluster.id
+ values.flinkClusterId = cluster.id
+ values.yarnSessionClusterId = cluster.clusterId
+ }
const params = {
jobType: 2,