This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new 6fa369b36 [Bug] deploy job on k8s session mode get job state bug fixed
(#3489)
6fa369b36 is described below
commit 6fa369b36165543b2879387b20cc74775e338e38
Author: benjobs <[email protected]>
AuthorDate: Wed Jan 17 08:51:12 2024 +0800
[Bug] deploy job on k8s session mode get job state bug fixed (#3489)
* [Improve] k8s deploy-cluster | start-app error info improvement
* [Improve] deploy yarn session cluster improvement
* [Improve] FlinkClient instance error issue fixed
* [Improve] job on k8s session mode state bug fixed
* [Improve] FE i18n improvement
* [Bug] k8s application mode job cancel bug fixed.
---------
Co-authored-by: benjobs <[email protected]>
---
.../streampark/common/conf/ConfigConst.scala | 2 +
.../console/core/entity/Application.java | 46 +++-
.../console/core/service/ApplicationService.java | 4 +
.../core/service/impl/AppBuildPipeServiceImpl.java | 17 +-
.../core/service/impl/ApplicationServiceImpl.java | 176 ++++++++++-----
.../core/service/impl/FlinkClusterServiceImpl.java | 36 ++-
.../core/service/impl/YarnQueueServiceImpl.java | 2 +-
.../console/core/task/FlinkK8sWatcherWrapper.java | 69 ++----
.../console/core/task/FlinkRESTAPIWatcher.java | 11 +-
.../src/api/flink/app/app.type.ts | 1 +
.../Application/src/AppDarkModeToggle.vue | 4 +-
.../src/components/ContextMenu/src/ContextMenu.vue | 4 +-
.../src/components/Form/src/BasicForm.vue | 2 +-
.../src/components/Page/src/PageFooter.vue | 4 +-
.../components/Table/src/components/HeaderCell.vue | 2 +-
.../src/hooks/web/useLockPage.ts | 9 +-
.../src/locales/lang/en/flink/app.ts | 15 +-
.../src/locales/lang/zh-CN/flink/app.ts | 14 +-
.../src/locales/lang/zh-CN/setting/flinkCluster.ts | 3 +-
.../streampark-console-webapp/src/utils/props.ts | 2 +-
.../src/views/base/login/Login.vue | 5 +-
.../src/views/flink/app/EditFlink.vue | 11 +-
.../src/views/flink/app/EditStreamPark.vue | 19 +-
.../flink/app/hooks/useCreateAndEditSchema.ts | 108 ++++-----
.../src/views/flink/app/hooks/useFlinkRender.tsx | 48 +++-
.../src/views/flink/app/utils/index.ts | 19 +-
.../flink/client/bean/CancelRequest.scala | 12 +-
.../flink/client/bean/DeployRequest.scala | 8 +-
.../flink/client/bean/DeployResponse.scala | 2 +-
.../flink/client/bean/ShutDownResponse.scala | 2 +-
.../client/bean/TriggerSavepointRequest.scala | 12 +-
.../impl/KubernetesNativeSessionClient.scala | 34 ++-
.../flink/client/impl/YarnSessionClient.scala | 33 ++-
.../flink/client/trait/FlinkClientTrait.scala | 7 +
.../flink/kubernetes/FlinkK8sWatchController.scala | 19 +-
.../flink/kubernetes/model/ClusterKey.scala | 19 +-
.../flink/kubernetes/model/JobStatusCV.scala | 10 +-
.../flink/kubernetes/model/K8sEventKey.scala | 16 +-
.../flink/kubernetes/model/TrackId.scala | 4 +-
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 247 +++++++++++----------
.../impl/FlinkK8sApplicationBuildPipeline.scala | 17 +-
41 files changed, 664 insertions(+), 411 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index e033a6f85..5517cc302 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -54,6 +54,8 @@ object ConfigConst {
/** kerberos */
val KEY_KERBEROS = "kerberos"
+ val KEY_KERBEROS_SERVICE_ACCOUNT = "kubernetes.service-account"
+
val KEY_HADOOP_USER_NAME = "HADOOP_USER_NAME"
/** hadoop.security.authentication */
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 4b03ee57b..8560a28c9 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -239,6 +239,7 @@ public class Application implements Serializable {
private transient String createTimeTo;
private transient String backUpDescription;
private transient String yarnQueue;
+ private transient String serviceAccount;
/** Flink Web UI Url */
private transient String flinkRestUrl;
@@ -281,20 +282,34 @@ public class Application implements Serializable {
this.tracking = shouldTracking(appState);
}
- public void setYarnQueueByHotParams() {
- if (!(ExecutionMode.YARN_APPLICATION == this.getExecutionModeEnum()
- || ExecutionMode.YARN_PER_JOB == this.getExecutionModeEnum())) {
+ public void setByHotParams() {
+ Map<String, Object> hotParamsMap = this.getHotParamsMap();
+ if (hotParamsMap.isEmpty()) {
return;
}
- Map<String, Object> hotParamsMap = this.getHotParamsMap();
- if (!hotParamsMap.isEmpty() &&
hotParamsMap.containsKey(ConfigConst.KEY_YARN_APP_QUEUE())) {
- String yarnQueue =
hotParamsMap.get(ConfigConst.KEY_YARN_APP_QUEUE()).toString();
- String labelExpr =
-
Optional.ofNullable(hotParamsMap.get(ConfigConst.KEY_YARN_APP_NODE_LABEL()))
- .map(Object::toString)
- .orElse(null);
- this.setYarnQueue(YarnQueueLabelExpression.of(yarnQueue,
labelExpr).toString());
+ switch (getExecutionModeEnum()) {
+ case YARN_APPLICATION:
+ case YARN_PER_JOB:
+ // 1) set yarnQueue from hostParam
+ if (hotParamsMap.containsKey(ConfigConst.KEY_YARN_APP_QUEUE())) {
+ String yarnQueue =
hotParamsMap.get(ConfigConst.KEY_YARN_APP_QUEUE()).toString();
+ String labelExpr =
+
Optional.ofNullable(hotParamsMap.get(ConfigConst.KEY_YARN_APP_NODE_LABEL()))
+ .map(Object::toString)
+ .orElse(null);
+ this.setYarnQueue(YarnQueueLabelExpression.of(yarnQueue,
labelExpr).toString());
+ }
+ break;
+ case KUBERNETES_NATIVE_APPLICATION:
+ // 2) service-account.
+ Object serviceAccount =
hotParamsMap.get(ConfigConst.KEY_KERBEROS_SERVICE_ACCOUNT());
+ if (serviceAccount != null) {
+ this.setServiceAccount(serviceAccount.toString());
+ }
+ break;
+ default:
+ break;
}
}
@@ -483,6 +498,10 @@ public class Application implements Serializable {
return this.getAppType() == ApplicationType.STREAMPARK_FLINK.getType();
}
+ public boolean isKubernetesModeJob() {
+ return ExecutionMode.isKubernetesMode(this.getExecutionModeEnum());
+ }
+
@JsonIgnore
@SneakyThrows
public MavenDependency getMavenDependency() {
@@ -570,6 +589,11 @@ public class Application implements Serializable {
if (needFillYarnQueueLabel(executionModeEnum)) {
hotParams.putAll(YarnQueueLabelExpression.getQueueLabelMap(appParam.getYarnQueue()));
}
+ if (executionModeEnum == ExecutionMode.KUBERNETES_NATIVE_APPLICATION) {
+ if (StringUtils.isNotBlank(appParam.getServiceAccount())) {
+ hotParams.put(ConfigConst.KEY_KERBEROS_SERVICE_ACCOUNT(),
appParam.getServiceAccount());
+ }
+ }
if (!hotParams.isEmpty()) {
this.setHotParams(JacksonUtils.write(hotParams));
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index 5ffa7a02e..42d30fec5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -23,6 +23,8 @@ import
org.apache.streampark.console.base.exception.ApplicationException;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.enums.AppExistsState;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService;
import org.springframework.web.multipart.MultipartFile;
@@ -124,4 +126,6 @@ public interface ApplicationService extends
IService<Application> {
String k8sStartLog(Long id, Integer offset, Integer limit) throws Exception;
AppExistsState checkStart(Application app);
+
+ List<ApplicationReport> getYARNApplication(String appName);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 6834d940d..1e6042b89 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -30,6 +30,7 @@ import
org.apache.streampark.console.core.entity.AppBuildPipeline;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.ApplicationConfig;
import org.apache.streampark.console.core.entity.ApplicationLog;
+import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.entity.FlinkSql;
import org.apache.streampark.console.core.entity.Message;
@@ -45,6 +46,7 @@ import
org.apache.streampark.console.core.service.ApplicationConfigService;
import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.CommonService;
+import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.FlinkSqlService;
import org.apache.streampark.console.core.service.MessageService;
@@ -125,6 +127,8 @@ public class AppBuildPipeServiceImpl
@Autowired private ApplicationService applicationService;
+ @Autowired private FlinkClusterService flinkClusterService;
+
@Autowired private ApplicationLogService applicationLogService;
@Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
@@ -327,6 +331,13 @@ public class AppBuildPipeServiceImpl
log.info("Submit params to building pipeline : {}", buildRequest);
return FlinkRemoteBuildPipeline.of(buildRequest);
case KUBERNETES_NATIVE_SESSION:
+ String k8sNamespace = app.getK8sNamespace();
+ String clusterId = app.getClusterId();
+ if (app.getFlinkClusterId() != null) {
+ FlinkCluster flinkCluster =
flinkClusterService.getById(app.getFlinkClusterId());
+ k8sNamespace = flinkCluster.getK8sNamespace();
+ clusterId = flinkCluster.getClusterId();
+ }
FlinkK8sSessionBuildRequest k8sSessionBuildRequest =
new FlinkK8sSessionBuildRequest(
app.getJobName(),
@@ -337,8 +348,8 @@ public class AppBuildPipeServiceImpl
app.getDevelopmentMode(),
flinkEnv.getFlinkVersion(),
app.getMavenArtifact(),
- app.getClusterId(),
- app.getK8sNamespace());
+ clusterId,
+ k8sNamespace);
log.info("Submit params to building pipeline : {}",
k8sSessionBuildRequest);
return FlinkK8sSessionBuildPipeline.of(k8sSessionBuildRequest);
case KUBERNETES_NATIVE_APPLICATION:
@@ -352,7 +363,7 @@ public class AppBuildPipeServiceImpl
app.getDevelopmentMode(),
flinkEnv.getFlinkVersion(),
app.getMavenArtifact(),
- app.getClusterId(),
+ app.getJobName(),
app.getK8sNamespace(),
app.getFlinkImage(),
app.getK8sPodTemplates(),
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index c27e5772e..9e827b5f8 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -22,6 +22,7 @@ import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.DevelopmentMode;
import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.enums.FlinkK8sRestExposedType;
import org.apache.streampark.common.enums.ResolveOrder;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.HdfsOperator;
@@ -78,6 +79,7 @@ import
org.apache.streampark.console.core.service.SavePointService;
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.VariableService;
import org.apache.streampark.console.core.service.YarnQueueService;
+import org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper;
import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.CancelRequest;
@@ -155,9 +157,9 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import scala.Tuple2;
+
import static org.apache.streampark.common.enums.StorageType.LFS;
-import static
org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper.Bridge.toTrackId;
-import static
org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper.isKubernetesApp;
@Slf4j
@Service
@@ -221,6 +223,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
@Autowired private YarnQueueService yarnQueueService;
+ @Autowired private FlinkK8sWatcherWrapper k8sWatcherWrapper;
+
@PostConstruct
public void resetOptionState() {
this.baseMapper.resetOptionState();
@@ -515,9 +519,10 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
record -> {
// status of flink job on kubernetes mode had been
automatically persisted to db
// in time.
- if (isKubernetesApp(record)) {
+ if (record.isKubernetesModeJob()) {
// set duration
- String restUrl =
flinkK8sWatcher.getRemoteRestUrl(toTrackId(record));
+ String restUrl =
+
flinkK8sWatcher.getRemoteRestUrl(k8sWatcherWrapper.toTrackId(record));
record.setFlinkRestUrl(restUrl);
if (record.getTracking() == 1
&& record.getStartTime() != null
@@ -656,7 +661,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
return AppExistsState.INVALID;
}
if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
- boolean exists =
!getApplicationReports(application.getJobName()).isEmpty();
+ boolean exists = !getYARNApplication(application.getJobName()).isEmpty();
return exists ? AppExistsState.IN_YARN : AppExistsState.NO;
}
// todo on k8s check...
@@ -706,7 +711,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
// check whether clusterId, namespace, jobId on kubernetes
else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())
- && flinkK8sWatcher.checkIsInRemoteCluster(toTrackId(app))) {
+ &&
flinkK8sWatcher.checkIsInRemoteCluster(k8sWatcherWrapper.toTrackId(app))) {
return AppExistsState.IN_KUBERNETES;
}
}
@@ -722,7 +727,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
// check whether clusterId, namespace, jobId on kubernetes
else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())
- && flinkK8sWatcher.checkIsInRemoteCluster(toTrackId(appParam))) {
+ &&
flinkK8sWatcher.checkIsInRemoteCluster(k8sWatcherWrapper.toTrackId(appParam))) {
return AppExistsState.IN_KUBERNETES;
}
}
@@ -951,7 +956,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
application.setExecutionMode(appParam.getExecutionMode());
application.setClusterId(appParam.getClusterId());
application.setFlinkImage(appParam.getFlinkImage());
- application.setK8sNamespace(appParam.getK8sNamespace());
application.updateHotParams(appParam);
application.setK8sRestExposedType(appParam.getK8sRestExposedType());
application.setK8sPodTemplate(appParam.getK8sPodTemplate());
@@ -974,6 +978,11 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
case YARN_PER_JOB:
case KUBERNETES_NATIVE_APPLICATION:
application.setFlinkClusterId(null);
+ if (appParam.getExecutionModeEnum() ==
ExecutionMode.KUBERNETES_NATIVE_APPLICATION) {
+ application.setK8sNamespace(appParam.getK8sNamespace());
+ application.setServiceAccount(appParam.getServiceAccount());
+ application.doSetHotParams();
+ }
break;
case REMOTE:
case YARN_SESSION:
@@ -1143,7 +1152,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
CompletableFuture<SubmitResponse> startFuture =
startFutureMap.remove(app.getId());
CompletableFuture<CancelResponse> cancelFuture =
cancelFutureMap.remove(app.getId());
Application application = this.baseMapper.getApp(app);
- if (isKubernetesApp(application)) {
+ if (application.isKubernetesModeJob()) {
KubernetesDeploymentHelper.watchPodTerminatedLog(
application.getK8sNamespace(), application.getJobName(),
application.getJobId());
}
@@ -1194,8 +1203,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
}
// add flink web url info for k8s-mode
- if (isKubernetesApp(application)) {
- String restUrl =
flinkK8sWatcher.getRemoteRestUrl(toTrackId(application));
+ if (application.isKubernetesModeJob()) {
+ String restUrl =
flinkK8sWatcher.getRemoteRestUrl(k8sWatcherWrapper.toTrackId(application));
application.setFlinkRestUrl(restUrl);
// set duration
@@ -1206,9 +1215,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
application.setDuration(now - application.getStartTime().getTime());
}
}
-
- application.setYarnQueueByHotParams();
-
+ application.setByHotParams();
return application;
}
@@ -1231,8 +1238,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
public boolean mapping(Application appParam) {
boolean mapping = this.baseMapper.mapping(appParam);
Application application = getById(appParam.getId());
- if (isKubernetesApp(application)) {
- flinkK8sWatcher.doWatching(toTrackId(application));
+ if (application.isKubernetesModeJob()) {
+ flinkK8sWatcher.doWatching(k8sWatcherWrapper.toTrackId(application));
} else {
FlinkRESTAPIWatcher.doWatching(application);
}
@@ -1278,23 +1285,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
}
- String clusterId = null;
- if (ExecutionMode.isKubernetesMode(application.getExecutionMode())) {
- clusterId = application.getClusterId();
- } else if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
- if
(ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
- FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
- ApiAlertException.throwIfNull(
- cluster,
- String.format(
- "The yarn session clusterId=%s can't found, maybe the
clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
- application.getFlinkClusterId()));
- clusterId = cluster.getClusterId();
- } else {
- clusterId = application.getAppId();
- }
- }
-
Map<String, Object> properties = new HashMap<>();
if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
@@ -1310,6 +1300,10 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
properties.put(RestOptions.PORT.key(), activeAddress.getPort());
}
+ Tuple2<String, String> clusterIdNamespace =
getNamespaceClusterId(application);
+ String namespace = clusterIdNamespace._1;
+ String clusterId = clusterIdNamespace._2;
+
CancelRequest cancelRequest =
new CancelRequest(
flinkEnv.getFlinkVersion(),
@@ -1320,7 +1314,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
appParam.getSavePointed(),
appParam.getDrain(),
customSavepoint,
- application.getK8sNamespace());
+ namespace);
final Date triggerTime = new Date();
CompletableFuture<CancelResponse> cancelFuture =
@@ -1328,7 +1322,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
cancelFutureMap.put(application.getId(), cancelFuture);
- TrackId trackId = isKubernetesApp(application) ? toTrackId(application) :
null;
+ TrackId trackId =
+ application.isKubernetesModeJob() ?
k8sWatcherWrapper.toTrackId(application) : null;
cancelFuture.whenComplete(
(cancelResponse, throwable) -> {
@@ -1352,7 +1347,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
savePointService.expire(application.getId());
}
// re-tracking flink job on kubernetes and logging exception
- if (isKubernetesApp(application)) {
+ if (application.isKubernetesModeJob()) {
flinkK8sWatcher.unWatching(trackId);
} else {
FlinkRESTAPIWatcher.unWatching(application.getId());
@@ -1378,7 +1373,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
savePointService.save(savePoint);
}
- if (isKubernetesApp(application)) {
+ if (application.isKubernetesModeJob()) {
flinkK8sWatcher.unWatching(trackId);
}
});
@@ -1450,7 +1445,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
// check job on yarn is already running
if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
ApiAlertException.throwIfTrue(
- !getApplicationReports(application.getJobName()).isEmpty(),
+ !getYARNApplication(application.getJobName()).isEmpty(),
"The same job name is already running in the yarn queue");
}
@@ -1569,6 +1564,27 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
String applicationArgs =
variableService.replaceVariable(application.getTeamId(),
application.getArgs());
+ String k8sNamespace = null;
+ String k8sClusterId = null;
+ FlinkK8sRestExposedType exposedType = null;
+ if (application.getExecutionModeEnum() ==
ExecutionMode.KUBERNETES_NATIVE_SESSION) {
+ // For compatibility with historical versions
+ if (application.getFlinkClusterId() == null) {
+ k8sClusterId = application.getClusterId();
+ k8sNamespace = application.getK8sNamespace();
+ exposedType = application.getK8sRestExposedTypeEnum();
+ } else {
+ FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
+ k8sClusterId = cluster.getClusterId();
+ k8sNamespace = cluster.getK8sNamespace();
+ exposedType = cluster.getK8sRestExposedTypeEnum();
+ }
+ } else if (application.getExecutionModeEnum() ==
ExecutionMode.KUBERNETES_NATIVE_APPLICATION) {
+ k8sClusterId = application.getJobName();
+ k8sNamespace = application.getK8sNamespace();
+ exposedType = application.getK8sRestExposedTypeEnum();
+ }
+
SubmitRequest submitRequest =
SubmitRequest.apply(
flinkEnv.getFlinkVersion(),
@@ -1585,11 +1601,10 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
applicationArgs,
buildResult,
extraParameter,
- application.getClusterId(),
- application.getK8sNamespace(),
- application.getK8sRestExposedTypeEnum());
+ k8sClusterId,
+ k8sNamespace,
+ exposedType);
- TrackId trackId = isKubernetesApp(application) ? toTrackId(application) :
null;
CompletableFuture<SubmitResponse> future =
CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest),
executorService);
@@ -1614,7 +1629,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
app.setState(FlinkAppState.FAILED.getValue());
app.setOptionState(OptionState.NONE.getValue());
updateById(app);
- if (isKubernetesApp(app)) {
+ if (app.isKubernetesModeJob()) {
+ TrackId trackId = k8sWatcherWrapper.toTrackId(application);
flinkK8sWatcher.unWatching(trackId);
} else {
FlinkRESTAPIWatcher.unWatching(appParam.getId());
@@ -1635,7 +1651,12 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
}
}
- application.setAppId(response.clusterId());
+
+ if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
+ application.setAppId(response.clusterId());
+ applicationLog.setYarnAppId(response.clusterId());
+ }
+
if (StringUtils.isNoneEmpty(response.jobId())) {
application.setJobId(response.jobId());
}
@@ -1644,18 +1665,21 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
application.setJobManagerUrl(response.jobManagerUrl());
applicationLog.setJobManagerUrl(response.jobManagerUrl());
}
- applicationLog.setYarnAppId(response.clusterId());
+
application.setStartTime(new Date());
application.setEndTime(null);
// if start completed, will be added task to tracking queue
- if (isKubernetesApp(application)) {
+ if (application.isKubernetesModeJob()) {
log.info(
"start job {} on {} success, doWatching...",
application.getJobName(),
application.getExecutionModeEnum().getName());
application.setRelease(ReleaseState.DONE.get());
+
+ TrackId trackId = k8sWatcherWrapper.toTrackId(application);
flinkK8sWatcher.doWatching(trackId);
+
if
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
String domainName = settingService.getIngressModeDefault();
if (StringUtils.isNotBlank(domainName)) {
@@ -1735,7 +1759,18 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
Map<String, String> dynamicProperties =
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties());
+
properties.putAll(dynamicProperties);
+
+ String kerberosKeySvcAccount = ConfigConst.KEY_KERBEROS_SERVICE_ACCOUNT();
+ String svcAcc1 = (String)
application.getHotParamsMap().get(kerberosKeySvcAccount);
+ String svcAcc2 = dynamicProperties.get(kerberosKeySvcAccount);
+ if (svcAcc1 != null) {
+ properties.put(kerberosKeySvcAccount, svcAcc1);
+ } else if (svcAcc2 != null) {
+ properties.put(kerberosKeySvcAccount, svcAcc2);
+ }
+
ResolveOrder resolveOrder = ResolveOrder.of(application.getResolveOrder());
if (resolveOrder != null) {
properties.put(CoreOptions.CLASSLOADER_RESOLVE_ORDER.key(),
resolveOrder.getName());
@@ -1752,8 +1787,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
updateById(application);
savePointService.expire(application.getId());
// re-tracking flink job on kubernetes and logging exception
- if (isKubernetesApp(application)) {
- TrackId id = toTrackId(application);
+ if (application.isKubernetesModeJob()) {
+ TrackId id = k8sWatcherWrapper.toTrackId(application);
flinkK8sWatcher.doWatching(id);
} else {
FlinkRESTAPIWatcher.unWatching(application.getId());
@@ -1761,7 +1796,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
// kill application
if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
try {
- List<ApplicationReport> applications =
getApplicationReports(application.getJobName());
+ List<ApplicationReport> applications =
getYARNApplication(application.getJobName());
if (!applications.isEmpty()) {
YarnClient yarnClient = HadoopUtils.yarnClient();
yarnClient.killApplication(applications.get(0).getApplicationId());
@@ -1821,8 +1856,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
if (isYarnNotDefaultQueue(newApp)) {
return true;
}
-
- oldApp.setYarnQueueByHotParams();
+ oldApp.setByHotParams();
if (ExecutionMode.isYarnPerJobOrAppMode(newApp.getExecutionModeEnum())
&& StringUtils.equals(oldApp.getYarnQueue(), newApp.getYarnQueue())) {
return true;
@@ -1843,7 +1877,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
|| yarnQueueService.isDefaultQueue(application.getYarnQueue());
}
- private List<ApplicationReport> getApplicationReports(String jobName) {
+ @Override
+ public List<ApplicationReport> getYARNApplication(String appName) {
try {
YarnClient yarnClient = HadoopUtils.yarnClient();
Set<String> types =
@@ -1859,10 +1894,45 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
Set<String> yarnTag = Sets.newHashSet("streampark");
List<ApplicationReport> applications = yarnClient.getApplications(types,
states, yarnTag);
return applications.stream()
- .filter(report -> report.getName().equals(jobName))
+ .filter(report -> report.getName().equals(appName))
.collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException("The yarn api is abnormal. Ensure that yarn
is running properly.");
}
}
+
+ private Tuple2<String, String> getNamespaceClusterId(Application
application) {
+ String clusterId = null;
+ String k8sNamespace = null;
+ FlinkCluster cluster;
+ switch (application.getExecutionModeEnum()) {
+ case YARN_APPLICATION:
+ case YARN_PER_JOB:
+ case YARN_SESSION:
+ clusterId = application.getAppId();
+ break;
+ case KUBERNETES_NATIVE_APPLICATION:
+ clusterId = application.getJobName();
+ k8sNamespace = application.getK8sNamespace();
+ break;
+ case KUBERNETES_NATIVE_SESSION:
+ if (application.getFlinkClusterId() == null) {
+ clusterId = application.getClusterId();
+ k8sNamespace = application.getK8sNamespace();
+ } else {
+ cluster =
flinkClusterService.getById(application.getFlinkClusterId());
+ ApiAlertException.throwIfNull(
+ cluster,
+ String.format(
+ "The Kubernetes session clusterId=%s can't found, maybe the
clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
+ application.getFlinkClusterId()));
+ clusterId = cluster.getClusterId();
+ k8sNamespace = cluster.getK8sNamespace();
+ }
+ break;
+ default:
+ break;
+ }
+ return Tuple2.apply(k8sNamespace, clusterId);
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index ef1f4a283..d309eb5d3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.util.ThreadUtils;
+import org.apache.streampark.common.util.Utils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApiDetailException;
@@ -150,14 +151,30 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
@Transactional(rollbackFor = {Exception.class})
public void start(Long id) {
FlinkCluster flinkCluster = getById(id);
+ ApiAlertException.throwIfTrue(flinkCluster == null, "Invalid id, no
related cluster found.");
+ ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
+ if (executionModeEnum == ExecutionMode.YARN_SESSION) {
+ ApiAlertException.throwIfTrue(
+
!applicationService.getYARNApplication(flinkCluster.getClusterName()).isEmpty(),
+ "The application name: "
+ + flinkCluster.getClusterName()
+ + " is already running in the yarn queue, please check!");
+ }
+
try {
- ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
DeployRequest deployRequest = getDeployRequest(flinkCluster);
log.info("deploy cluster request: " + deployRequest);
+
Future<DeployResponse> future =
executorService.submit(() -> FlinkClient.deploy(deployRequest));
DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS);
- if (deployResponse != null) {
+ if (deployResponse.error() != null) {
+ throw new ApiDetailException(
+ "deploy cluster "
+ + flinkCluster.getClusterName()
+ + "failed, exception:\n"
+ + Utils.stringifyException(deployResponse.error()));
+ } else {
if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
String address =
YarnUtils.getRMWebAppURL(true) + "/proxy/" +
deployResponse.clusterId() + "/";
@@ -169,9 +186,6 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
flinkCluster.setClusterState(ClusterState.STARTED.getValue());
flinkCluster.setException(null);
updateById(flinkCluster);
- } else {
- throw new ApiAlertException(
- "deploy cluster failed, unknown reason,please check you params or
StreamPark error log");
}
} catch (Exception e) {
log.error(e.getMessage(), e);
@@ -191,13 +205,15 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
flinkEnv.getFlinkVersion(),
executionModeEnum,
flinkCluster.getProperties(),
- flinkCluster.getClusterId());
+ flinkCluster.getClusterId(),
+ flinkCluster.getClusterName());
case KUBERNETES_NATIVE_SESSION:
return KubernetesDeployRequest.apply(
flinkEnv.getFlinkVersion(),
executionModeEnum,
flinkCluster.getProperties(),
flinkCluster.getClusterId(),
+ flinkCluster.getClusterName(),
flinkCluster.getK8sNamespace(),
flinkCluster.getK8sConf(),
flinkCluster.getServiceAccount(),
@@ -279,12 +295,14 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
Future<ShutDownResponse> future =
executorService.submit(() -> FlinkClient.shutdown(deployRequest));
ShutDownResponse shutDownResponse = future.get(60, TimeUnit.SECONDS);
- if (shutDownResponse != null) {
+ if (shutDownResponse.error() != null) {
+ throw new ApiDetailException(
+ "shutdown cluster failed, error: \n"
+ + Utils.stringifyException(shutDownResponse.error()));
+ } else {
flinkCluster.setAddress(null);
flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
updateById(flinkCluster);
- } else {
- throw new ApiAlertException("get shutdown response failed");
}
} catch (Exception e) {
log.error(e.getMessage(), e);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
index cabde3de2..5cafab958 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
@@ -235,7 +235,7 @@ public class YarnQueueServiceImpl extends
ServiceImpl<YarnQueueMapper, YarnQueue
.stream()
.filter(
application -> {
- application.setYarnQueueByHotParams();
+ application.setByHotParams();
return StringUtils.equals(application.getYarnQueue(),
queueLabel);
})
.collect(Collectors.toList());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index 6a113a56e..71724e41b 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -19,13 +19,14 @@ package org.apache.streampark.console.core.task;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.service.ApplicationService;
+import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcherFactory;
import org.apache.streampark.flink.kubernetes.FlinkTrackConfig;
import org.apache.streampark.flink.kubernetes.KubernetesRetriever;
import org.apache.streampark.flink.kubernetes.enums.FlinkJobState;
-import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode;
import org.apache.streampark.flink.kubernetes.model.TrackId;
import org.apache.commons.collections.CollectionUtils;
@@ -42,8 +43,6 @@ import javax.annotation.Nonnull;
import java.util.List;
import java.util.stream.Collectors;
-import scala.Enumeration;
-
import static
org.apache.streampark.console.core.enums.FlinkAppState.Bridge.toK8sFlinkJobState;
/**
@@ -63,6 +62,8 @@ public class FlinkK8sWatcherWrapper {
@Lazy @Autowired private ApplicationService applicationService;
+ @Lazy @Autowired private FlinkClusterService flinkClusterService;
+
/** Register FlinkTrackMonitor bean for tracking flink job on kubernetes. */
@Bean(destroyMethod = "close")
public FlinkK8sWatcher registerFlinkK8sWatcher() {
@@ -102,15 +103,6 @@ public class FlinkK8sWatcherWrapper {
if (CollectionUtils.isEmpty(k8sApplication)) {
return Lists.newArrayList();
}
- // correct corrupted data
- List<Application> correctApps =
- k8sApplication.stream()
- .filter(app -> !Bridge.toTrackId(app).isLegal())
- .collect(Collectors.toList());
- if (CollectionUtils.isNotEmpty(correctApps)) {
- applicationService.saveOrUpdateBatch(correctApps);
- }
-
// filter out the application that should be tracking
return k8sApplication.stream()
.filter(
@@ -121,43 +113,30 @@ public class FlinkK8sWatcherWrapper {
KubernetesRetriever.isDeploymentExists(app.getK8sNamespace(),
app.getClusterId());
return !isEndState || deploymentExists;
})
- .map(Bridge::toTrackId)
+ .map(this::toTrackId)
.collect(Collectors.toList());
}
- /** Type converter bridge */
- public static class Bridge {
-
- // covert Application to TrackId
- public static TrackId toTrackId(@Nonnull Application app) {
-
- Enumeration.Value mode =
FlinkK8sExecuteMode.of(app.getExecutionModeEnum());
- if (FlinkK8sExecuteMode.APPLICATION().equals(mode)) {
- return TrackId.onApplication(
- app.getK8sNamespace(),
- app.getClusterId(),
- app.getId(),
- app.getJobId(),
- app.getTeamId().toString());
- } else if (FlinkK8sExecuteMode.SESSION().equals(mode)) {
- return TrackId.onSession(
- app.getK8sNamespace(),
- app.getClusterId(),
- app.getId(),
- app.getJobId(),
- app.getTeamId().toString());
- } else {
- throw new IllegalArgumentException(
- "Illegal K8sExecuteMode, mode=" + app.getExecutionMode());
+ public TrackId toTrackId(Application app) {
+ if (app.getExecutionModeEnum() ==
ExecutionMode.KUBERNETES_NATIVE_APPLICATION) {
+ return TrackId.onApplication(
+ app.getK8sNamespace(),
+ app.getJobName(),
+ app.getId(),
+ app.getJobId(),
+ app.getTeamId().toString());
+ } else if (app.getExecutionModeEnum() ==
ExecutionMode.KUBERNETES_NATIVE_SESSION) {
+ String namespace = app.getK8sNamespace();
+ String clusterId = app.getClusterId();
+ if (app.getFlinkClusterId() != null) {
+ FlinkCluster flinkCluster =
flinkClusterService.getById(app.getFlinkClusterId());
+ namespace = flinkCluster.getK8sNamespace();
+ clusterId = flinkCluster.getClusterId();
}
+ return TrackId.onSession(
+ namespace, clusterId, app.getId(), app.getJobId(),
app.getTeamId().toString());
+ } else {
+ throw new IllegalArgumentException("Illegal K8sExecuteMode, mode=" +
app.getExecutionMode());
}
}
-
- /** Determine if application it is flink-on-kubernetes mode. */
- public static boolean isKubernetesApp(Application application) {
- if (application == null) {
- return false;
- }
- return ExecutionMode.isKubernetesMode(application.getExecutionMode());
- }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
index fab29a74e..f1b1b5a61 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
@@ -597,7 +597,7 @@ public class FlinkRESTAPIWatcher {
}
public static void doWatching(Application application) {
- if (isKubernetesApp(application)) {
+ if (application.isKubernetesModeJob()) {
return;
}
log.info("FlinkRESTAPIWatcher add app to tracking,appId:{}",
application.getId());
@@ -642,13 +642,12 @@ public class FlinkRESTAPIWatcher {
return WATCHING_APPS.values();
}
- private static boolean isKubernetesApp(Application application) {
- return FlinkK8sWatcherWrapper.isKubernetesApp(application);
- }
-
private static boolean isKubernetesApp(Long appId) {
Application app = WATCHING_APPS.get(appId);
- return FlinkK8sWatcherWrapper.isKubernetesApp(app);
+ if (app == null) {
+ return false;
+ }
+ return app.isKubernetesModeJob();
}
private YarnAppInfo httpYarnAppInfo(Application application) throws
Exception {
diff --git
a/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
b/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
index a56cb4dad..e79a1cedc 100644
--- a/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
+++ b/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
@@ -56,6 +56,7 @@ export interface AppListRecord {
clusterId?: string;
flinkImage?: string;
k8sNamespace: string;
+ serviceAccount?: string;
state: number;
release: number;
build: boolean;
diff --git
a/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue
b/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue
index d4e0ce164..19ba3b151 100644
---
a/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue
+++
b/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue
@@ -63,9 +63,7 @@
height: 18px;
background-color: #fff;
border-radius: 50%;
- transition:
- transform 0.5s,
- background-color 0.5s;
+ transition: transform 0.5s, background-color 0.5s;
will-change: transform;
}
diff --git
a/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue
b/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue
index 78cac5c5b..e08c25f36 100644
---
a/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue
+++
b/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue
@@ -179,9 +179,7 @@
background-color: @component-background;
border: 1px solid rgb(0 0 0 / 8%);
border-radius: 0.25rem;
- box-shadow:
- 0 2px 2px 0 rgb(0 0 0 / 14%),
- 0 3px 1px -2px rgb(0 0 0 / 10%),
+ box-shadow: 0 2px 2px 0 rgb(0 0 0 / 14%), 0 3px 1px -2px rgb(0 0 0 / 10%),
0 1px 5px 0 rgb(0 0 0 / 6%);
background-clip: padding-box;
user-select: none;
diff --git
a/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue
b/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue
index e5a9dacf6..1cd7e3809 100644
---
a/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue
+++
b/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue
@@ -113,7 +113,7 @@
});
const getBindValue = computed(
- () => ({ ...attrs, ...props, ...unref(getProps) }) as Recordable,
+ () => ({ ...attrs, ...props, ...unref(getProps) } as Recordable),
);
const getSchema = computed((): FormSchema[] => {
diff --git
a/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue
b/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue
index 8fdbc8f41..e89a6ce97 100644
---
a/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue
+++
b/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue
@@ -39,9 +39,7 @@
line-height: 44px;
background-color: @component-background;
border-top: 1px solid @border-color-base;
- box-shadow:
- 0 -6px 16px -8px rgb(0 0 0 / 8%),
- 0 -9px 28px 0 rgb(0 0 0 / 5%),
+ box-shadow: 0 -6px 16px -8px rgb(0 0 0 / 8%), 0 -9px 28px 0 rgb(0 0 0 /
5%),
0 -12px 48px 16px rgb(0 0 0 / 3%);
transition: width 0.2s;
diff --git
a/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
b/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
index 36ab854c5..35c080269 100644
---
a/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
+++
b/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
@@ -22,7 +22,7 @@
props: {
column: {
type: Object as PropType<BasicColumn>,
- default: () => ({}) as BasicColumn,
+ default: () => ({} as BasicColumn),
},
},
setup(props) {
diff --git
a/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts
b/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts
index 9a6607421..c543be954 100644
--- a/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts
+++ b/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts
@@ -32,12 +32,9 @@ export function useLockPage() {
}
clear();
- timeId = setTimeout(
- () => {
- lockPage();
- },
- lockTime * 60 * 1000,
- );
+ timeId = setTimeout(() => {
+ lockPage();
+ }, lockTime * 60 * 1000);
}
function lockPage(): void {
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
index 7e1a6abd2..cde802766 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
@@ -46,6 +46,7 @@ export default {
uploadJar: 'Upload Jar',
kubernetesNamespace: 'Kubernetes Namespace',
kubernetesClusterId: 'Kubernetes ClusterId',
+ kubernetesCluster: 'Kubernetes Cluster',
flinkBaseDockerImage: 'Flink Base Docker Image',
restServiceExposedType: 'Rest-Service Exposed Type',
resourceFrom: 'Resource From',
@@ -221,8 +222,6 @@ export default {
'The application name is already exists in YARN, cannot be repeated.
Please check',
appNameExistsInK8sMessage:
'The application name is already exists in Kubernetes,cannot be
repeated. Please check',
- appNameNotValid:
- 'The application name is invalid, must be (Chinese or English or "-" or
"_"), two consecutive spaces cannot appear.Please check',
flinkClusterIsRequiredMessage: 'Flink Cluster is required',
flinkSqlIsRequiredMessage: 'Flink SQL is required',
tagsPlaceholder: 'Please enter tags,if more than one, separate them with
commas(,)',
@@ -234,10 +233,18 @@ export default {
tmPlaceholder: 'Please select the resource parameters to set',
yarnQueuePlaceholder: 'Please enter yarn queue label',
descriptionPlaceholder: 'Please enter description for this application',
+ serviceAccountPlaceholder: 'Please enter kubernetes service-account, e.g:
default',
kubernetesNamespacePlaceholder: 'Please enter kubernetes Namespace, e.g:
default',
kubernetesClusterIdPlaceholder: 'Please enter Kubernetes clusterId',
- kubernetesClusterIdRequire:
- "lower case alphanumeric characters, '-', and must start and end with an
alphanumeric character,and no more than 45 characters",
+ appNameValid: 'The application name is invalid',
+ appNameRole: 'The application name is invalid',
+ appNameK8sClusterIdRole:
+ 'The current deployment mode is K8s Application mode, and the job name
will be used as the clusterId in K8s. Therefore, the job name must follow the
following rules:',
+ appNameK8sClusterIdRoleLength: 'must be no more than 45 characters',
+ appNameK8sClusterIdRoleRegexp:
+ 'must only contain lowercase alphanumeric characters and "-",The
required format is [a-z]([-a-z0-9]*[a-z0-9])',
+ appNameRoleContent:
+ 'must be (Chinese or English or "-" or "_"), two consecutive spaces
cannot appear.Please check',
kubernetesClusterIdIsRequiredMessage: 'Kubernetes clusterId is required',
flinkImagePlaceholder:
'Please enter the tag of Flink base docker image, such as:
flink:1.13.0-scala_2.11-java8',
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
index 7ac9ef7e0..9cbb9c7ab 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
@@ -46,6 +46,7 @@ export default {
uploadJar: '上传依赖Jar文件',
kubernetesNamespace: 'K8S命名空间',
kubernetesClusterId: 'K8S ClusterId',
+ kubernetesCluster: 'Kubernetes Session Cluster',
flinkBaseDockerImage: 'Flink基础docker镜像',
restServiceExposedType: 'K8S服务对外类型',
resourceFrom: '资源来源',
@@ -216,8 +217,14 @@ export default {
appNameNotUniqueMessage: '作业名称必须唯一, 输入的作业名称已经存在',
appNameExistsInYarnMessage: '应用程序名称已经在YARN集群中存在,不能重复。请检查',
appNameExistsInK8sMessage: '该应用程序名称已经在K8S集群中存在,不能重复。请检查',
- appNameNotValid:
- '应用程序名称无效。字符必须是(中文 或 英文 或 "-" 或 "_"),不能出现两个连续的空格,请检查',
+ appNameValid: '应用程序名称不合法',
+ appNameRole: '应用名称必须遵循以下规则:',
+ appNameK8sClusterIdRole:
+ '当前部署模式是 K8s Application模式,会将作业名称作为k8s的 clusterId,因此作业名称要遵循以下规则:',
+ appNameK8sClusterIdRoleLength: '不应超过 45 个字符',
+ appNameK8sClusterIdRoleRegexp:
+ '只能由小写字母、数字、字符、和"-" 组成,必须满足正则格式 [a-z]([-a-z0-9]*[a-z0-9])',
+ appNameRoleContent: '字符必须是(中文 或 英文 或 "-" 或 "_"),不能出现两个连续的空格',
flinkClusterIsRequiredMessage: 'Flink集群必填',
flinkSqlIsRequiredMessage: 'Flink SQL必填',
tagsPlaceholder: '请输入标签,如果超过一个,用逗号(,)分隔',
@@ -229,10 +236,9 @@ export default {
tmPlaceholder: '请选择要设置的资源参数',
yarnQueuePlaceholder: '请输入yarn队列标签名称',
descriptionPlaceholder: '请输入此应用程序的描述',
+ serviceAccountPlaceholder: '请输入K8S服务账号(service-account)',
kubernetesNamespacePlaceholder: '请输入K8S命名空间, 如: default',
kubernetesClusterIdPlaceholder: '请选择K8S ClusterId',
- kubernetesClusterIdRequire:
- '小写字母、数字、“-”,并且必须以字母数字字符开头和结尾,并且不超过45个字符',
kubernetesClusterIdIsRequiredMessage: 'K8S ClusterId必填',
flinkImagePlaceholder:
'请输入Flink基础docker镜像的标签,如:flink:1.13.0-scala_2.11-java8',
flinkImageIsRequiredMessage: 'Flink基础docker镜像是必填的',
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkCluster.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkCluster.ts
index 4d9974736..6d9df6bec 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkCluster.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkCluster.ts
@@ -33,7 +33,8 @@ export default {
yarnSessionClusterId: 'Yarn Session模式集群ID',
k8sNamespace: 'k8s命名空间',
k8sClusterId: 'k8s集群ID',
- serviceAccount: 'k8s命名空间绑定的服务账号',
+ k8sSessionCluster: 'k8s Session集群',
+ serviceAccount: 'k8s服务账号',
k8sConf: 'k8s环境Kube配置文件',
flinkImage: 'Flink基础docker镜像',
k8sRestExposedType: 'K8S服务对外类型',
diff --git a/streampark-console/streampark-console-webapp/src/utils/props.ts
b/streampark-console/streampark-console-webapp/src/utils/props.ts
index 5d1d35150..ebbe33a1a 100644
--- a/streampark-console/streampark-console-webapp/src/utils/props.ts
+++ b/streampark-console/streampark-console-webapp/src/utils/props.ts
@@ -175,7 +175,7 @@ export const buildProps = <
: never;
};
-export const definePropType = <T>(val: any) => ({ [wrapperKey]: val }) as
PropWrapper<T>;
+export const definePropType = <T>(val: any) => ({ [wrapperKey]: val } as
PropWrapper<T>);
export const keyOf = <T extends Object>(arr: T) => Object.keys(arr) as
Array<keyof T>;
export const mutable = <T extends readonly any[] | Record<string,
unknown>>(val: T) =>
diff --git
a/streampark-console/streampark-console-webapp/src/views/base/login/Login.vue
b/streampark-console/streampark-console-webapp/src/views/base/login/Login.vue
index 2e0598e68..d82491015 100644
---
a/streampark-console/streampark-console-webapp/src/views/base/login/Login.vue
+++
b/streampark-console/streampark-console-webapp/src/views/base/login/Login.vue
@@ -59,8 +59,9 @@
/>
</a>
<p class="text-light-100 pt-10px" style="border-top: 1px solid #dad7d7">
- Copyright © 2022-{{ `${new Date().getFullYear()}` }} The Apache
Software Foundation. Apache StreamPark, StreamPark, and its
- feather logo are trademarks of The Apache Software Foundation.
+ Copyright © 2022-{{ `${new Date().getFullYear()}` }} The Apache
Software Foundation. Apache
+ StreamPark, StreamPark, and its feather logo are trademarks of The
Apache Software
+ Foundation.
</p>
</footer>
</div>
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
index 31a6e3208..ed8f04c93 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
@@ -106,12 +106,21 @@
? 'yarnSessionClusterId'
: 'flinkClusterId']: app.flinkClusterId,
flinkImage: app.flinkImage,
- k8sNamespace: app.k8sNamespace,
+ k8sNamespace: app.k8sNamespace || null,
+ serviceAccount: app.serviceAccount || null,
alertId: selectAlertId,
projectName: app.projectName,
module: app.module,
...resetParams,
};
+
+ if (app.executionMode == ExecModeEnum.KUBERNETES_SESSION) {
+ Object.assign(defaultParams, { flinkClusterId: app.flinkClusterId });
+ } else if (app.executionMode == ExecModeEnum.YARN_SESSION) {
+ Object.assign(defaultParams, { flinkClusterId: app.flinkClusterId });
+ } else if (app.executionMode == ExecModeEnum.REMOTE) {
+ }
+
if (!executionMode) {
Object.assign(defaultParams, { executionMode: app.executionMode });
}
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
index c6f9197b0..0ff995bec 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
@@ -120,13 +120,26 @@
cpFailureAction: app.cpFailureAction,
},
clusterId: app.clusterId,
- [app.executionMode == ExecModeEnum.YARN_SESSION
- ? 'yarnSessionClusterId'
- : 'flinkClusterId']: app.flinkClusterId,
flinkImage: app.flinkImage,
k8sNamespace: app.k8sNamespace,
+ serviceAccount: app.serviceAccount || null,
...resetParams,
};
+
+ switch (app.executionMode) {
+ case ExecModeEnum.REMOTE:
+ defaultParams['remoteClusterId'] = app.flinkClusterId;
+ break;
+ case ExecModeEnum.YARN_SESSION:
+ defaultParams['yarnSessionClusterId'] = app.flinkClusterId;
+ break;
+ case ExecModeEnum.KUBERNETES_SESSION:
+ defaultParams['k8sSessionClusterId'] = app.flinkClusterId;
+ break;
+ default:
+ break;
+ }
+
if (!executionMode) {
Object.assign(defaultParams, { executionMode: app.executionMode });
}
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
index 8a6942398..fba0fc31b 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
@@ -20,6 +20,7 @@ import { executionModes, k8sRestExposedType, resolveOrder }
from '../data';
import optionData from '../data/option';
import {
getAlertSvgIcon,
+ renderJobName,
renderDynamicProperties,
renderInputDropdown,
renderInputGroup,
@@ -199,7 +200,7 @@ export const useCreateAndEditSchema = (
],
},
{
- field: 'flinkClusterId',
+ field: 'remoteClusterId',
label: t('flink.app.flinkCluster'),
component: 'Select',
componentProps: {
@@ -224,11 +225,24 @@ export const useCreateAndEditSchema = (
{ required: true, message:
t('flink.app.addAppTips.flinkClusterIsRequiredMessage') },
],
},
+ {
+ field: 'k8sSessionClusterId',
+ label: t('flink.app.flinkCluster'),
+ component: 'Select',
+ componentProps: {
+ placeholder: t('flink.app.flinkCluster'),
+ options: getExecutionCluster(ExecModeEnum.KUBERNETES_SESSION, 'id'),
+ },
+ ifShow: ({ values }) => values.executionMode ==
ExecModeEnum.KUBERNETES_SESSION,
+ rules: [
+ { required: true, message:
t('flink.app.addAppTips.flinkClusterIsRequiredMessage') },
+ ],
+ },
{
field: 'k8sNamespace',
label: t('flink.app.kubernetesNamespace'),
component: 'Input',
- ifShow: ({ values }) => isK8sExecMode(values.executionMode),
+ ifShow: ({ values }) => values.executionMode ==
ExecModeEnum.KUBERNETES_APPLICATION,
render: ({ model, field }) =>
renderInputDropdown(model, field, {
placeholder:
t('flink.app.addAppTips.kubernetesNamespacePlaceholder'),
@@ -236,39 +250,15 @@ export const useCreateAndEditSchema = (
}),
},
{
- field: 'clusterId',
- label: t('flink.app.kubernetesClusterId'),
+ field: 'serviceAccount',
+ label: t('setting.flinkCluster.form.serviceAccount'),
component: 'Input',
- componentProps: ({ formModel }) => {
- return {
- placeholder: t('flink.app.addAppTips.kubernetesClusterIdRequire'),
- onChange: (e: ChangeEvent) => (formModel.jobName = e.target.value),
- };
- },
ifShow: ({ values }) => values.executionMode ==
ExecModeEnum.KUBERNETES_APPLICATION,
- rules: [
- {
- required: true,
- message: t('flink.app.addAppTips.kubernetesClusterIdRequire'),
- pattern: /^(?=.{1,45}$)[a-z]([-a-z0-9]*[a-z0-9])$/,
- },
- ],
- },
- {
- field: 'clusterId',
- label: t('flink.app.kubernetesClusterId'),
- component: 'Select',
- ifShow: ({ values }) => values.executionMode ==
ExecModeEnum.KUBERNETES_SESSION,
- componentProps: {
- placeholder:
t('flink.app.addAppTips.kubernetesClusterIdPlaceholder'),
- options: getExecutionCluster(ExecModeEnum.KUBERNETES_SESSION,
'clusterId'),
- },
- rules: [
- {
- required: true,
- message:
t('flink.app.addAppTips.kubernetesClusterIdIsRequiredMessage'),
- },
- ],
+ render: ({ model, field }) =>
+ renderInputDropdown(model, field, {
+ placeholder: t('flink.app.addAppTips.serviceAccountPlaceholder'),
+ options: unref(historyRecord)?.k8sNamespace || [],
+ }),
},
{
field: 'flinkImage',
@@ -296,26 +286,33 @@ export const useCreateAndEditSchema = (
});
/* Detect job name field */
- async function getJobNameCheck(_rule: RuleObject, value: StoreValue) {
+ async function getJobNameCheck(_rule: RuleObject, value: StoreValue, model:
Recordable) {
if (value === null || value === undefined || value === '') {
return
Promise.reject(t('flink.app.addAppTips.appNameIsRequiredMessage'));
- } else {
- const params = { jobName: value };
- if (edit?.appId) Object.assign(params, { id: edit.appId });
- const res = await fetchCheckName(params);
- switch (parseInt(res)) {
- case 0:
- return Promise.resolve();
- case 1:
- return
Promise.reject(t('flink.app.addAppTips.appNameNotUniqueMessage'));
- case 2:
- return
Promise.reject(t('flink.app.addAppTips.appNameExistsInYarnMessage'));
- case 3:
- return
Promise.reject(t('flink.app.addAppTips.appNameExistsInK8sMessage'));
- default:
- return Promise.reject(t('flink.app.addAppTips.appNameNotValid'));
+ }
+ if (model.executionMode == ExecModeEnum.KUBERNETES_APPLICATION) {
+ const regexp = /^(?=.{1,45}$)[a-z]([-a-z0-9]*[a-z0-9])$/;
+ if (!regexp.test(value)) {
+ return Promise.reject(t('flink.app.addAppTips.appNameValid'));
}
}
+ const params = { jobName: value };
+ if (edit?.appId) {
+ Object.assign(params, { id: edit.appId });
+ }
+ const res = await fetchCheckName(params);
+ switch (parseInt(res)) {
+ case 0:
+ return Promise.resolve();
+ case 1:
+ return
Promise.reject(t('flink.app.addAppTips.appNameNotUniqueMessage'));
+ case 2:
+ return
Promise.reject(t('flink.app.addAppTips.appNameExistsInYarnMessage'));
+ case 3:
+ return
Promise.reject(t('flink.app.addAppTips.appNameExistsInK8sMessage'));
+ default:
+ return Promise.reject(t('flink.app.addAppTips.appNameValid'));
+ }
}
const getFlinkFormOtherSchemas = computed((): FormSchema[] => {
@@ -329,9 +326,16 @@ export const useCreateAndEditSchema = (
field: 'jobName',
label: t('flink.app.appName'),
component: 'Input',
- componentProps: { placeholder:
t('flink.app.addAppTips.appNamePlaceholder') },
- dynamicRules: () => {
- return [{ required: true, trigger: 'blur', validator:
getJobNameCheck }];
+ render: (renderCallbackParams) => renderJobName(renderCallbackParams),
+ dynamicRules: ({ model }) => {
+ return [
+ {
+ required: true,
+ trigger: 'blur',
+ validator: (rule: RuleObject, value: StoreValue) =>
+ getJobNameCheck(rule, value, model),
+ },
+ ];
},
},
{
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
index 851778227..068911c83 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
@@ -39,7 +39,12 @@ import { handleConfTemplate } from '/@/api/flink/config';
import { decodeByBase64 } from '/@/utils/cipher';
import { useMessage } from '/@/hooks/web/useMessage';
import { SelectValue } from 'ant-design-vue/lib/select';
-import { AppTypeEnum, CandidateTypeEnum, FailoverStrategyEnum } from
'/@/enums/flinkEnum';
+import {
+ AppTypeEnum,
+ CandidateTypeEnum,
+ ExecModeEnum,
+ FailoverStrategyEnum,
+} from '/@/enums/flinkEnum';
import { useI18n } from '/@/hooks/web/useI18n';
import { fetchYarnQueueList } from '/@/api/flink/setting/yarnQueue';
import { ApiSelect } from '/@/components/Form';
@@ -261,6 +266,46 @@ export const renderYarnQueue = ({ model, field }:
RenderCallbackParams) => {
);
};
+export const renderJobName = ({ model, field }: RenderCallbackParams) => {
+ return (
+ <div>
+ <Input
+ name="jobName"
+ placeholder={t('flink.app.addAppTips.appNamePlaceholder')}
+ value={model[field]}
+ onInput={(e: ChangeEvent) => (model[field] = e?.target?.value)}
+ />
+ <p class="conf-desc mt-10px">
+ <span class="note-info">
+ <Tag color="#2db7f5" class="tag-note">
+ {t('flink.app.noteInfo.note')}
+ </Tag>
+ {model.executionMode == ExecModeEnum.KUBERNETES_APPLICATION && (
+ <span>
+ {t('flink.app.addAppTips.appNameK8sClusterIdRole')}
+ <div>
+ <Tag color="orange"> 1.</Tag>
+ {t('flink.app.addAppTips.appNameK8sClusterIdRoleLength')}
+ </div>
+ <div>
+ <Tag color="orange"> 2.</Tag>
+ {t('flink.app.addAppTips.appNameK8sClusterIdRoleRegexp')}
+ </div>
+ </span>
+ )}
+
+ {model.executionMode != ExecModeEnum.KUBERNETES_APPLICATION && (
+ <span>
+ <span>{t('flink.app.addAppTips.appNameRole')}</span>
+ <span>{t('flink.app.addAppTips.appNameRoleContent')}</span>
+ </span>
+ )}
+ </span>
+ </p>
+ </div>
+ );
+};
+
/* render memory option */
export const renderDynamicProperties = ({ model, field }:
RenderCallbackParams) => {
return (
@@ -277,7 +322,6 @@ export const renderDynamicProperties = ({ model, field }:
RenderCallbackParams)
<Tag color="#2db7f5" class="tag-note">
{t('flink.app.noteInfo.note')}
</Tag>
- {t('flink.app.noteInfo.dynamicProperties')}
<a
href="https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html"
target="_blank"
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
index 715dc1ebd..87274d832 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
@@ -265,6 +265,19 @@ export function handleDependencyJsonToPom(json, pomMap,
jarMap) {
}
}
+function getFlinkClusterId(values: Recordable) {
+ if (values.executionMode == ExecModeEnum.YARN_SESSION) {
+ return values.yarnSessionClusterId;
+ }
+ if (values.executionMode == ExecModeEnum.REMOTE) {
+ return values.remoteClusterId;
+ }
+ if (values.executionMode == ExecModeEnum.KUBERNETES_SESSION) {
+ return values.k8sSessionClusterId;
+ }
+ return null;
+}
+
export function handleSubmitParams(
params: Recordable,
values: Recordable,
@@ -290,14 +303,12 @@ export function handleSubmitParams(
description: values.description,
k8sNamespace: values.k8sNamespace || null,
clusterId: values.clusterId || null,
- flinkClusterId:
- (values.executionMode == ExecModeEnum.YARN_SESSION
- ? values.yarnSessionClusterId
- : values.flinkClusterId) || null,
+ flinkClusterId: getFlinkClusterId(values),
flinkImage: values.flinkImage || null,
});
if (params.executionMode == ExecModeEnum.KUBERNETES_APPLICATION) {
Object.assign(params, {
+ serviceAccount: values.serviceAccount,
k8sPodTemplate: k8sTemplate.podTemplate,
k8sJmPodTemplate: k8sTemplate.jmPodTemplate,
k8sTmPodTemplate: k8sTemplate.tmPodTemplate,
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
index f11f561c9..7587f750e 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
@@ -25,13 +25,13 @@ import javax.annotation.Nullable
import java.util.{Map => JavaMap}
case class CancelRequest(
- flinkVersion: FlinkVersion,
- executionMode: ExecutionMode,
- @Nullable properties: JavaMap[String, Any],
- clusterId: String,
- jobId: String,
+ override val flinkVersion: FlinkVersion,
+ override val executionMode: ExecutionMode,
+ @Nullable override val properties: JavaMap[String, Any],
+ override val clusterId: String,
+ override val jobId: String,
override val withSavepoint: Boolean,
withDrain: Boolean,
- savepointPath: String,
+ override val savepointPath: String,
override val kubernetesNamespace: String =
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE)
extends SavepointRequestTrait
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
index 81b36a17b..936af108e 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
@@ -31,7 +31,8 @@ case class DeployRequest(
flinkVersion: FlinkVersion,
executionMode: ExecutionMode,
properties: JavaMap[String, Any],
- clusterId: String) {
+ clusterId: String,
+ clusterName: String) {
private[client] lazy val hdfsWorkspace = {
@@ -65,12 +66,13 @@ class KubernetesDeployRequest(
override val executionMode: ExecutionMode,
override val properties: JavaMap[String, Any],
override val clusterId: String,
+ override val clusterName: String,
val kubernetesNamespace: String =
KubernetesConfigOptions.NAMESPACE.defaultValue(),
val kubeConf: String = "~/.kube/config",
val serviceAccount: String =
KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT.defaultValue(),
val flinkImage: String =
KubernetesConfigOptions.CONTAINER_IMAGE.defaultValue(),
val flinkRestExposedType: FlinkK8sRestExposedType =
FlinkK8sRestExposedType.CLUSTER_IP)
- extends DeployRequest(flinkVersion, executionMode, properties, clusterId)
+ extends DeployRequest(flinkVersion, executionMode, properties, clusterId,
clusterName)
object KubernetesDeployRequest {
def apply(
@@ -78,6 +80,7 @@ object KubernetesDeployRequest {
executionMode: ExecutionMode,
properties: JavaMap[String, Any],
clusterId: String,
+ clusterName: String,
kubernetesNamespace: String,
kubeConf: String,
serviceAccount: String,
@@ -88,6 +91,7 @@ object KubernetesDeployRequest {
executionMode,
properties,
clusterId,
+ clusterName,
kubernetesNamespace,
kubeConf,
serviceAccount,
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala
index 91b6ecad0..6e1edbaff 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala
@@ -17,4 +17,4 @@
package org.apache.streampark.flink.client.bean
-case class DeployResponse(address: String, clusterId: String)
+case class DeployResponse(address: String = null, clusterId: String = null,
error: Throwable = null)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownResponse.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownResponse.scala
index 5c9a14728..de2df366f 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownResponse.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownResponse.scala
@@ -17,4 +17,4 @@
package org.apache.streampark.flink.client.bean
-case class ShutDownResponse()
+case class ShutDownResponse(error: Throwable = null)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
index 8f6344834..7bb87b06e 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
@@ -26,11 +26,11 @@ import java.util.{Map => JavaMap}
/** Trigger savepoint request. */
case class TriggerSavepointRequest(
- flinkVersion: FlinkVersion,
- executionMode: ExecutionMode,
- @Nullable properties: JavaMap[String, Any],
- clusterId: String,
- jobId: String,
- savepointPath: String,
+ override val flinkVersion: FlinkVersion,
+ override val executionMode: ExecutionMode,
+ @Nullable override val properties: JavaMap[String, Any],
+ override val clusterId: String,
+ override val jobId: String,
+ override val savepointPath: String,
override val kubernetesNamespace: String =
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE)
extends SavepointRequestTrait
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 7638be720..3606c03a0 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -169,16 +169,9 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
client =
clusterDescriptor.deploySessionCluster(kubernetesClusterDescriptor._2).getClusterClient
}
-
- if (client.getWebInterfaceURL != null) {
- DeployResponse(client.getWebInterfaceURL, client.getClusterId)
- } else {
- null
- }
+ getDeployResponse(client)
} catch {
- case e: Exception =>
- logError(s"start flink session fail in ${deployRequest.executionMode}
mode")
- throw e
+ case e: Exception => DeployResponse(error = e)
} finally {
Utils.close(client, clusterDescriptor, kubeClient)
}
@@ -204,18 +197,17 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
val flinkConfig = this.getFlinkK8sConfig(shutDownRequest)
val clusterDescriptor = getK8sClusterDescriptor(flinkConfig)
- val client = clusterDescriptor
- .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
- .getClusterClient
- try {
- client.shutDownCluster()
- ShutDownResponse()
- } catch {
- case e: Exception =>
- logError(s"shutdown flink session fail in
${shutDownRequest.executionMode} mode")
- throw e
- } finally {
- Utils.close(client)
+ Try(
+ clusterDescriptor
+ .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
+ .getClusterClient
+ ) match {
+ case Failure(e) => ShutDownResponse(e)
+ case Success(c) =>
+ Try(c.shutDownCluster()) match {
+ case Success(_) => ShutDownResponse()
+ case Failure(e) => ShutDownResponse(e)
+ }
}
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index edcd6ed75..0e0b4b3c9 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -21,6 +21,7 @@ import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.client.`trait`.YarnClientTrait
import org.apache.streampark.flink.client.bean._
+import org.apache.commons.lang3.StringUtils
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
import org.apache.flink.client.program.{ClusterClient, PackagedProgram}
import org.apache.flink.configuration._
@@ -58,7 +59,10 @@ object YarnSessionClient extends YarnClientTrait {
* @param deployRequest
* @param flinkConfig
*/
- def deployClusterConfig(deployRequest: DeployRequest, flinkConfig:
Configuration): Unit = {
+ private def deployClusterConfig(
+ deployRequest: DeployRequest,
+ flinkConfig: Configuration): Unit = {
+
val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
deployRequest.flinkVersion.flinkHome)
val currentUser = UserGroupInformation.getCurrentUser
@@ -86,6 +90,10 @@ object YarnSessionClient extends YarnClientTrait {
.safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
// conf dir
.safeSet(DeploymentOptionsInternal.CONF_DIR,
s"${deployRequest.flinkVersion.flinkHome}/conf")
+ // app tags
+ .safeSet(YarnConfigOptions.APPLICATION_TAGS, "streampark")
+ // app name
+ .safeSet(YarnConfigOptions.APPLICATION_NAME, deployRequest.clusterName)
logInfo(s"""
|------------------------------------------------------------------
@@ -164,10 +172,12 @@ object YarnSessionClient extends YarnClientTrait {
try {
val flinkConfig =
extractConfiguration(deployRequest.flinkVersion.flinkHome,
deployRequest.properties)
+
deployClusterConfig(deployRequest, flinkConfig)
+
val yarnClusterDescriptor =
getSessionClusterDeployDescriptor(flinkConfig)
clusterDescriptor = yarnClusterDescriptor._2
- if (null != deployRequest.clusterId && deployRequest.clusterId.nonEmpty)
{
+ if (StringUtils.isNotBlank(deployRequest.clusterId)) {
try {
val applicationStatus =
clusterDescriptor.getYarnClient
@@ -183,22 +193,14 @@ object YarnSessionClient extends YarnClientTrait {
}
}
} catch {
- case _: ApplicationNotFoundException =>
- logInfo("this applicationId have not managed by yarn ,need deploy
...")
+ case e: Exception => return DeployResponse(error = e)
}
}
val clientProvider =
clusterDescriptor.deploySessionCluster(yarnClusterDescriptor._1)
client = clientProvider.getClusterClient
- if (client.getWebInterfaceURL != null) {
- DeployResponse(client.getWebInterfaceURL, client.getClusterId.toString)
- } else {
- null
- }
+ getDeployResponse(client)
} catch {
- case e: Exception =>
- logError(s"start flink session fail in ${deployRequest.executionMode}
mode")
- e.printStackTrace()
- throw e
+ case e: Exception => DeployResponse(error = e)
} finally {
Utils.close(client, clusterDescriptor)
}
@@ -234,10 +236,7 @@ object YarnSessionClient extends YarnClientTrait {
.getFinalApplicationStatus}")
ShutDownResponse()
} catch {
- case e: Exception =>
- logError(s"shutdown flink session fail in
${shutDownRequest.executionMode} mode")
- e.printStackTrace()
- throw e
+ case e: Exception => ShutDownResponse(e)
} finally {
Utils.close(client, clusterDescriptor)
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index f1b171310..09b4ed8ad 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -524,4 +524,11 @@ trait FlinkClientTrait extends Logger {
clientWrapper.triggerSavepoint(jobID, savepointPath).get()
}
+ def getDeployResponse(client: ClusterClient[_]): DeployResponse = {
+ if (client.getWebInterfaceURL != null) {
+ DeployResponse(address = client.getWebInterfaceURL, clusterId =
client.getClusterId.toString)
+ } else {
+ DeployResponse(error = new RuntimeException("get the cluster
getWebInterfaceURL failed."))
+ }
+ }
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
index 717208cf0..94020f170 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
@@ -17,11 +17,12 @@
package org.apache.streampark.flink.kubernetes
-import org.apache.streampark.common.util.Logger
+import org.apache.streampark.common.util.{Logger, Utils}
import org.apache.streampark.flink.kubernetes.model._
import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
+import java.util.Objects
import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
@@ -107,9 +108,19 @@ class FlinkK8sWatchController extends Logger with
AutoCloseable {
}
//----cache----
-case class CacheKey(key: java.lang.Long) extends Serializable
+case class CacheKey(key: java.lang.Long) extends Serializable {
+ override def hashCode(): Int = Utils.hashCode(key)
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case that: CacheKey => Objects.equals(key, that.key)
+ case _ => false
+ }
+ }
+}
class TrackIdCache {
+
private[this] lazy val cache: Cache[CacheKey, TrackId] =
Caffeine.newBuilder.build()
def update(k: TrackId): Unit = {
@@ -153,7 +164,9 @@ class JobStatusCache {
def getAsMap(trackIds: Set[TrackId]): Map[CacheKey, JobStatusCV] =
cache.getAllPresent(trackIds.map(t => t.appId)).toMap
- def get(k: TrackId): JobStatusCV = cache.getIfPresent(CacheKey(k.appId))
+ def get(k: TrackId): JobStatusCV = {
+ cache.getIfPresent(CacheKey(k.appId))
+ }
def invalidate(k: TrackId): Unit = cache.invalidate(CacheKey(k.appId))
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
index 991a9647d..73d97e275 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
@@ -17,13 +17,30 @@
package org.apache.streampark.flink.kubernetes.model
+import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
/** flink cluster identifier on kubernetes */
case class ClusterKey(
executeMode: FlinkK8sExecuteMode.Value,
namespace: String = "default",
- clusterId: String)
+ clusterId: String) {
+
+ override def toString: String = executeMode.toString + namespace + clusterId
+
+ override def hashCode(): Int = Utils.hashCode(executeMode, namespace,
clusterId)
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case that: ClusterKey =>
+ this.executeMode == that.executeMode &&
+ this.namespace == that.namespace &&
+ this.clusterId == that.clusterId
+ case _ => false
+ }
+ }
+
+}
object ClusterKey {
def of(trackId: TrackId): ClusterKey =
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/JobStatusCV.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/JobStatusCV.scala
index 7bd5bd52a..3c3781435 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/JobStatusCV.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/JobStatusCV.scala
@@ -42,4 +42,12 @@ case class JobStatusCV(
duration: Long = 0,
taskTotal: Int = 0,
pollEmitTime: Long,
- pollAckTime: Long)
+ pollAckTime: Long) {
+
+ def diff(that: JobStatusCV): Boolean = {
+ that == null ||
+ that.jobState != this.jobState ||
+ that.jobId != this.jobId
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/K8sEventKey.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/K8sEventKey.scala
index 75abf5e94..7eb0a51e9 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/K8sEventKey.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/K8sEventKey.scala
@@ -17,4 +17,18 @@
package org.apache.streampark.flink.kubernetes.model
-case class K8sEventKey(namespace: String, clusterId: String)
+import org.apache.streampark.common.util.Utils
+
+case class K8sEventKey(namespace: String, clusterId: String) {
+
+ override def hashCode(): Int = Utils.hashCode(namespace, clusterId)
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case that: K8sEventKey =>
+ this.namespace == that.namespace &&
+ this.clusterId == that.clusterId
+ case _ => false
+ }
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
index e280122c8..a7147ce67 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.kubernetes.model
import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
-import java.lang.{Boolean => JavaBool}
+import java.lang.{Boolean => JavaBool, Long => JavaLong}
import scala.util.Try
@@ -29,7 +29,7 @@ case class TrackId(
executeMode: FlinkK8sExecuteMode.Value,
namespace: String = "default",
clusterId: String,
- appId: Long,
+ appId: JavaLong = null,
jobId: String,
groupId: String) {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 36bac3d68..8bb89e623 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -20,8 +20,8 @@ package org.apache.streampark.flink.kubernetes.watcher
import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.{ChangeEventBus,
FlinkK8sWatchController, JobStatusWatcherConfig, KubernetesRetriever}
-import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
-import
org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION,
SESSION}
+import org.apache.streampark.flink.kubernetes.enums.{FlinkJobState,
FlinkK8sExecuteMode}
+import
org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.APPLICATION
import org.apache.streampark.flink.kubernetes.event.FlinkJobStatusChangeEvent
import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
import org.apache.streampark.flink.kubernetes.model._
@@ -101,59 +101,66 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
.getOrElse(return
)
- // retrieve flink job status in thread pool
- val tracksFuture: Set[Future[Option[JobStatusCV]]] = trackIds.map {
- id =>
- val future = Future {
- id.executeMode match {
- case SESSION => touchSessionJob(id)
- case APPLICATION => touchApplicationJob(id)
- }
- }
-
- future.onComplete(_.getOrElse(None) match {
- case Some(jobState) =>
- val trackId = id.copy(jobId = jobState.jobId)
- val latest: JobStatusCV =
watchController.jobStatuses.get(trackId)
-
- val eventChanged = latest == null ||
- latest.jobState != jobState.jobState ||
- latest.jobId != jobState.jobId
-
- if (eventChanged) {
- logInfo(s"eventChanged.....$trackId")
- // put job status to cache
- watchController.jobStatuses.put(trackId, jobState)
- // set jobId to trackIds
- watchController.trackIds.update(trackId)
- eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState))
- }
+ // 1) k8s application mode
+ val appFuture: Set[Future[Option[JobStatusCV]]] =
+ trackIds.filter(_.executeMode == FlinkK8sExecuteMode.APPLICATION).map {
+ id =>
+ val future = Future(touchApplicationJob(id))
+ future.onComplete(_.getOrElse(None) match {
+ case Some(jobState) =>
+ updateState(id.copy(jobId = jobState.jobId), jobState)
+ case _ =>
+ })
+ future
+ }
- val deployExists = KubernetesRetriever.isDeploymentExists(
- trackId.namespace,
- trackId.clusterId
- )
-
- if (FlinkJobState.isEndState(jobState.jobState) &&
!deployExists) {
- // remove trackId from cache of job that needs to be untracked
- watchController.unWatching(trackId)
- if (trackId.executeMode == APPLICATION) {
- watchController.endpoints.invalidate(trackId.toClusterKey)
- }
- }
+ // 2) k8s session mode
+ val sessionIds = trackIds.filter(_.executeMode ==
FlinkK8sExecuteMode.SESSION)
+ val sessionCluster =
sessionIds.groupBy(_.toClusterKey.toString).flatMap(_._2).toSet
+ val sessionFuture = sessionCluster.map {
+ id =>
+ val future = Future(touchSessionAllJob(id))
+ future.onComplete(_.toOption match {
+ case Some(map) =>
+ sessionIds.foreach(
+ id => {
+ map.find(_._1.jobId == id.jobId) match {
+ case Some(job) =>
+ updateState(job._1.copy(appId = id.appId), job._2)
+ case _ =>
+ // can't find that job in the k8s cluster.
+ watchController.unWatching(id)
+ val lostState = JobStatusCV(
+ jobState = FlinkJobState.LOST,
+ jobId = id.jobId,
+ pollEmitTime = System.currentTimeMillis,
+ pollAckTime = System.currentTimeMillis
+ )
+ eventBus.postSync(FlinkJobStatusChangeEvent(id,
lostState))
+ }
+ })
case _ =>
})
future
}
// blocking until all future are completed or timeout is reached
- Try(Await.ready(Future.sequence(tracksFuture), conf.requestTimeoutSec
seconds)).failed.map {
+ Try(Await.ready(Future.sequence(appFuture), conf.requestTimeoutSec
seconds)).failed.map {
_ =>
logWarn(
- s"[FlinkJobStatusWatcher] tracking flink job status on kubernetes
mode timeout," +
+ s"[FlinkJobStatusWatcher] tracking flink job status on kubernetes
native application mode timeout," +
s" limitSeconds=${conf.requestTimeoutSec}," +
s" trackIds=${trackIds.mkString(",")}")
}
+
+ Try(Await.ready(Future.sequence(sessionFuture), conf.requestTimeoutSec
seconds)).failed.map {
+ _ =>
+ logWarn(
+ s"[FlinkJobStatusWatcher] tracking flink job status on kubernetes
native session mode timeout," +
+ s" limitSeconds=${conf.requestTimeoutSec}," +
+ s" trackIds=${trackIds.mkString(",")}")
+ }
+
logDebug(
"[FlinkJobStatusWatcher]: End of status monitoring process - " + Thread
.currentThread()
@@ -169,43 +176,10 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
* result.
*/
def touchSessionJob(@Nonnull trackId: TrackId): Option[JobStatusCV] = {
- val pollEmitTime = System.currentTimeMillis
-
- val id = TrackId.onSession(
- trackId.namespace,
- trackId.clusterId,
- trackId.appId,
- trackId.jobId,
- trackId.groupId
- )
-
- val rsMap = touchSessionAllJob(
- trackId.namespace,
- trackId.clusterId,
- trackId.appId,
- trackId.groupId
- ).toMap
-
- val jobState = rsMap.get(id).filter(_.jobState !=
FlinkJobState.SILENT).getOrElse {
- val preCache = watchController.jobStatuses.get(id)
- val state = inferSilentOrLostFromPreCache(preCache)
- val nonFirstSilent =
- state == FlinkJobState.SILENT && preCache != null && preCache.jobState
== FlinkJobState.SILENT
- if (nonFirstSilent) {
- JobStatusCV(
- jobState = state,
- jobId = id.jobId,
- pollEmitTime = preCache.pollEmitTime,
- pollAckTime = preCache.pollAckTime)
- } else {
- JobStatusCV(
- jobState = state,
- jobId = id.jobId,
- pollEmitTime = pollEmitTime,
- pollAckTime = System.currentTimeMillis)
- }
- }
- Some(jobState)
+ touchSessionAllJob(trackId)
+ .find(id => id._1.jobId == trackId.jobId && id._2.jobState !=
FlinkJobState.SILENT)
+ .map(_._2)
+ .orElse(inferState(trackId))
}
/**
@@ -215,28 +189,18 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
* This method can be called directly from outside, without affecting the
current cachePool
* result.
*/
- private def touchSessionAllJob(
- @Nonnull namespace: String,
- @Nonnull clusterId: String,
- @Nonnull appId: Long,
- @Nonnull groupId: String): Array[(TrackId, JobStatusCV)] = {
-
- lazy val defaultResult = Array.empty[(TrackId, JobStatusCV)]
+ private def touchSessionAllJob(trackId: TrackId): Map[TrackId, JobStatusCV]
= {
val pollEmitTime = System.currentTimeMillis
-
- val jobDetails = listJobsDetails(ClusterKey(SESSION, namespace, clusterId))
- .getOrElse(return defaultResult)
- .jobs
-
- if (jobDetails.isEmpty) {
- defaultResult
- } else {
- jobDetails.map {
- d =>
- val trackId = TrackId.onSession(namespace, clusterId, appId, d.jid,
groupId)
- val jobStatus = d.toJobStatusCV(pollEmitTime,
System.currentTimeMillis)
- trackId -> jobStatus
- }
+ val jobDetails = listJobsDetails(ClusterKey.of(trackId))
+ jobDetails match {
+ case Some(details) if details.jobs.nonEmpty =>
+ details.jobs.map {
+ d =>
+ val jobStatus = d.toJobStatusCV(pollEmitTime,
System.currentTimeMillis)
+ val trackItem = trackId.copy(jobId = d.jid, appId = null)
+ trackItem -> jobStatus
+ }.toMap
+ case None => Map.empty[TrackId, JobStatusCV]
}
}
@@ -249,16 +213,61 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
*/
def touchApplicationJob(@Nonnull trackId: TrackId): Option[JobStatusCV] = {
implicit val pollEmitTime: Long = System.currentTimeMillis
- val clusterId = trackId.clusterId
- val namespace = trackId.namespace
- val jobDetails = listJobsDetails(ClusterKey(APPLICATION, namespace,
clusterId))
+ val jobDetails = listJobsDetails(ClusterKey.of(trackId))
if (jobDetails.isEmpty || jobDetails.get.jobs.isEmpty) {
- inferJobStateFromK8sEvent(trackId)
+ inferStateFromK8sEvent(trackId)
} else {
Some(jobDetails.get.jobs.head.toJobStatusCV(pollEmitTime,
System.currentTimeMillis))
}
}
+ private[this] def updateState(trackId: TrackId, jobState: JobStatusCV): Unit
= {
+ val latest: JobStatusCV = watchController.jobStatuses.get(trackId)
+ if (jobState.diff(latest)) {
+ // put job status to cache
+ watchController.jobStatuses.put(trackId, jobState)
+ // set jobId to trackIds
+ watchController.trackIds.update(trackId)
+ eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState))
+ }
+
+ lazy val deployExists = KubernetesRetriever.isDeploymentExists(
+ trackId.namespace,
+ trackId.clusterId
+ )
+
+ if (FlinkJobState.isEndState(jobState.jobState) && !deployExists) {
+ // remove trackId from cache of job that needs to be untracked
+ watchController.unWatching(trackId)
+ if (trackId.executeMode == APPLICATION) {
+ watchController.endpoints.invalidate(trackId.toClusterKey)
+ }
+ }
+ }
+
+ private[this] def inferState(id: TrackId): Option[JobStatusCV] = {
+ lazy val pollEmitTime = System.currentTimeMillis
+ val preCache = watchController.jobStatuses.get(id)
+ val state = inferFromPreCache(preCache)
+ val nonFirstSilent = state == FlinkJobState.SILENT &&
+ preCache != null &&
+ preCache.jobState == FlinkJobState.SILENT
+ val jobState = if (nonFirstSilent) {
+ JobStatusCV(
+ jobState = state,
+ jobId = id.jobId,
+ pollEmitTime = preCache.pollEmitTime,
+ pollAckTime = preCache.pollAckTime)
+ } else {
+ JobStatusCV(
+ jobState = state,
+ jobId = id.jobId,
+ pollEmitTime = pollEmitTime,
+ pollAckTime = System.currentTimeMillis)
+ }
+ Option(jobState)
+ }
+
/** list flink jobs details */
private def listJobsDetails(clusterKey: ClusterKey): Option[JobDetails] = {
// get flink rest api
@@ -284,27 +293,26 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
/** list flink jobs details from rest api */
private def callJobsOverviewsApi(restUrl: String): Option[JobDetails] = {
- val jobDetails = JobDetails.as(
+ JobDetails.as(
Request
.get(s"$restUrl/jobs/overview")
.connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
.responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
.execute
.returnContent()
- .asString(StandardCharsets.UTF_8))
- jobDetails
+ .asString(StandardCharsets.UTF_8)
+ )
}
/**
* Infer the current flink state from the last relevant k8s events. This
method is only used for
* application-mode job inference in case of a failed JM rest request.
*/
- private def inferJobStateFromK8sEvent(@Nonnull trackId: TrackId)(implicit
+ private def inferStateFromK8sEvent(@Nonnull trackId: TrackId)(implicit
pollEmitTime: Long): Option[JobStatusCV] = {
// infer from k8s deployment and event
val latest: JobStatusCV = watchController.jobStatuses.get(trackId)
-
val jobState = trackId match {
case id if watchController.canceling.has(id) =>
logger.info(s"trackId ${trackId.toString} is canceling")
@@ -334,13 +342,13 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
trackId.jobId)
FlinkJobState.FAILED
} else {
- inferSilentOrLostFromPreCache(latest)
+ inferFromPreCache(latest)
}
} else if (isConnection) {
logger.info("The deployment is deleted and enters the task failure
process.")
FlinkJobState.of(FlinkHistoryArchives.getJobStateFromArchiveFile(trackId.jobId))
} else {
- inferSilentOrLostFromPreCache(latest)
+ inferFromPreCache(latest)
}
}
@@ -359,7 +367,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
}
}
- private[this] def inferSilentOrLostFromPreCache(preCache: JobStatusCV) =
preCache match {
+ private[this] def inferFromPreCache(preCache: JobStatusCV) = preCache match {
case preCache if preCache == null => FlinkJobState.SILENT
case preCache
if preCache.jobState == FlinkJobState.SILENT &&
@@ -373,11 +381,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
object FlinkJobStatusWatcher {
- private val effectEndStates: Seq[FlinkJobState.Value] =
- FlinkJobState.endingStates.filter(_ != FlinkJobState.LOST)
-
/**
- * infer flink job state before persistence. so drama, so sad.
+ * infer flink job state before persistence.
*
* @param current
* current flink job state
@@ -388,15 +393,15 @@ object FlinkJobStatusWatcher {
current: FlinkJobState.Value,
previous: FlinkJobState.Value): FlinkJobState.Value = {
current match {
- case FlinkJobState.LOST =>
- if (effectEndStates.contains(current)) previous else
FlinkJobState.TERMINATED
case FlinkJobState.POS_TERMINATED | FlinkJobState.TERMINATED =>
previous match {
case FlinkJobState.CANCELLING => FlinkJobState.CANCELED
case FlinkJobState.FAILING => FlinkJobState.FAILED
case _ =>
- if (current == FlinkJobState.POS_TERMINATED) FlinkJobState.FINISHED
- else FlinkJobState.TERMINATED
+ current match {
+ case FlinkJobState.POS_TERMINATED => FlinkJobState.FINISHED
+ case _ => FlinkJobState.TERMINATED
+ }
}
case _ => current
}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
index 060209ff9..f0183e76b 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
@@ -127,7 +127,7 @@ class FlinkK8sApplicationBuildPipeline(request:
FlinkK8sApplicationBuildRequest)
val dockerConf = request.dockerConfig
val baseImageTag = request.flinkBaseImage.trim
val pushImageTag = {
- val expectedImageTag =
s"streamparkflinkjob-${request.k8sNamespace}-${request.clusterId}"
+ val expectedImageTag =
s"streampark-${request.k8sNamespace}-${request.clusterId}"
compileTag(expectedImageTag, dockerConf.registerAddress,
dockerConf.imageNamespace)
}
@@ -203,16 +203,15 @@ class FlinkK8sApplicationBuildPipeline(request:
FlinkK8sApplicationBuildRequest)
}.getOrElse(throw getError.exception)
// Step-8: init build workspace of ingress
- val ingressOutputPath = request.ingressTemplate match {
- case ingress if StringUtils.isBlank(ingress) =>
- skipStep(8)
- ""
+ request.ingressTemplate match {
+ case ingress if StringUtils.isBlank(ingress) => skipStep(8)
case _ =>
execStep(8) {
- val ingressOutputPath =
- IngressController.prepareIngressTemplateFiles(buildWorkspace,
request.ingressTemplate)
- logInfo(s"export flink ingress: $ingressOutputPath")
- ingressOutputPath
+ val path = IngressController.prepareIngressTemplateFiles(
+ buildWorkspace,
+ request.ingressTemplate
+ )
+ logInfo(s"export flink ingress: $path")
}.getOrElse(throw getError.exception)
}