This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new 97dfe6b2f [Improve] custom_code build pipeline bug fixed. (#3484)
97dfe6b2f is described below
commit 97dfe6b2fdcc50960ed5fe377a05e3fc0309a204
Author: benjobs <[email protected]>
AuthorDate: Wed Jan 10 20:56:17 2024 +0800
[Improve] custom_code build pipeline bug fixed. (#3484)
Co-authored-by: benjobs <[email protected]>
---
.../core/service/impl/AppBuildPipeServiceImpl.java | 25 ++++++++---------
.../core/service/impl/ApplicationServiceImpl.java | 24 ++++++++---------
.../streampark-console-webapp/package.json | 2 +-
.../src/views/base/login/Login.vue | 2 +-
.../flink/kubernetes/DefaultFlinkK8sWatcher.scala | 12 ++++-----
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 31 +++++++++++-----------
6 files changed, 49 insertions(+), 47 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index a002a8585..6834d940d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -19,7 +19,6 @@ package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.Workspace;
-import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.FileUtils;
@@ -378,7 +377,7 @@ public class AppBuildPipeServiceImpl
FsOperator localFS = FsOperator.lfs();
// 1. copy jar to local upload dir
- if (app.isFlinkSqlJob() || app.isApacheFlinkCustomCodeJob()) {
+ if (app.isFlinkSqlJob() || app.isCustomCodeJob()) {
if (!app.getMavenDependency().getJar().isEmpty()) {
for (String jar : app.getMavenDependency().getJar()) {
File localJar = new File(WebUtils.getAppTempDir(), jar);
@@ -393,14 +392,14 @@ public class AppBuildPipeServiceImpl
}
}
- if (app.isApacheFlinkCustomCodeJob()) {
+ if (app.isCustomCodeJob()) {
// customCode upload jar to appHome...
FsOperator fsOperator = app.getFsOperator();
ResourceFrom resourceFrom = ResourceFrom.of(app.getResourceFrom());
File userJar;
if (resourceFrom == ResourceFrom.CICD) {
- userJar = getAppDistJar(app);
+ userJar = getCustomCodeAppDistJar(app);
} else if (resourceFrom == ResourceFrom.UPLOAD) {
userJar = new File(WebUtils.getAppTempDir(), app.getJar());
} else {
@@ -484,15 +483,17 @@ public class AppBuildPipeServiceImpl
}
}
- private File getAppDistJar(Application app) {
- if (app.getApplicationType() == ApplicationType.STREAMPARK_FLINK) {
- return new File(app.getDistHome() + "/lib",
app.getModule().concat(".jar"));
- }
- if (app.getApplicationType() == ApplicationType.APACHE_FLINK) {
- return new File(app.getDistHome(), app.getJar());
+ private File getCustomCodeAppDistJar(Application app) {
+ switch (app.getApplicationType()) {
+ case APACHE_FLINK:
+ return new File(app.getDistHome(), app.getJar());
+ case STREAMPARK_FLINK:
+ String userJar = String.format("%s/lib/%s.jar", app.getDistHome(),
app.getModule());
+ return new File(userJar);
+ default:
+ throw new IllegalArgumentException(
+ "[StreamPark] unsupported ApplicationType of custom code: " +
app.getApplicationType());
}
- throw new IllegalArgumentException(
- "[StreamPark] unsupported ApplicationType of custom code: " +
app.getApplicationType());
}
/** copy from {@link ApplicationServiceImpl#start(Application, boolean)} */
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index ed55c27cc..869e79afd 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -210,7 +210,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
@Autowired private EnvInitializer envInitializer;
- @Autowired private FlinkK8sWatcher k8SFlinkTrackMonitor;
+ @Autowired private FlinkK8sWatcher flinkK8sWatcher;
@Autowired private AppBuildPipeService appBuildPipeService;
@@ -282,7 +282,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
// merge metrics from flink kubernetes cluster
- FlinkMetricCV k8sMetric =
k8SFlinkTrackMonitor.getAccGroupMetrics(teamId.toString());
+ FlinkMetricCV k8sMetric =
flinkK8sWatcher.getAccGroupMetrics(teamId.toString());
if (k8sMetric != null) {
totalJmMemory += k8sMetric.totalJmMemory();
totalTmMemory += k8sMetric.totalTmMemory();
@@ -518,7 +518,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
// in time.
if (isKubernetesApp(record)) {
// set duration
- String restUrl =
k8SFlinkTrackMonitor.getRemoteRestUrl(toTrackId(record));
+ String restUrl =
flinkK8sWatcher.getRemoteRestUrl(toTrackId(record));
record.setFlinkRestUrl(restUrl);
if (record.getTracking() == 1
&& record.getStartTime() != null
@@ -707,7 +707,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
// check whether clusterId, namespace, jobId on kubernetes
else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())
- && k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(app))) {
+ && flinkK8sWatcher.checkIsInRemoteCluster(toTrackId(app))) {
return AppExistsState.IN_KUBERNETES;
}
}
@@ -723,7 +723,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
// check whether clusterId, namespace, jobId on kubernetes
else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())
- && k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(appParam)))
{
+ && flinkK8sWatcher.checkIsInRemoteCluster(toTrackId(appParam))) {
return AppExistsState.IN_KUBERNETES;
}
}
@@ -1196,7 +1196,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
// add flink web url info for k8s-mode
if (isKubernetesApp(application)) {
- String restUrl =
k8SFlinkTrackMonitor.getRemoteRestUrl(toTrackId(application));
+ String restUrl =
flinkK8sWatcher.getRemoteRestUrl(toTrackId(application));
application.setFlinkRestUrl(restUrl);
// set duration
@@ -1233,7 +1233,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
boolean mapping = this.baseMapper.mapping(appParam);
Application application = getById(appParam.getId());
if (isKubernetesApp(application)) {
- k8SFlinkTrackMonitor.doWatching(toTrackId(application));
+ flinkK8sWatcher.doWatching(toTrackId(application));
} else {
FlinkRESTAPIWatcher.doWatching(application);
}
@@ -1354,7 +1354,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
// re-tracking flink job on kubernetes and logging exception
if (isKubernetesApp(application)) {
- k8SFlinkTrackMonitor.unWatching(trackId);
+ flinkK8sWatcher.unWatching(trackId);
} else {
FlinkRESTAPIWatcher.unWatching(application.getId());
}
@@ -1380,7 +1380,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
if (isKubernetesApp(application)) {
- k8SFlinkTrackMonitor.unWatching(trackId);
+ flinkK8sWatcher.unWatching(trackId);
}
});
}
@@ -1620,7 +1620,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
app.setOptionState(OptionState.NONE.getValue());
updateById(app);
if (isKubernetesApp(app)) {
- k8SFlinkTrackMonitor.unWatching(trackId);
+ flinkK8sWatcher.unWatching(trackId);
} else {
FlinkRESTAPIWatcher.unWatching(appParam.getId());
}
@@ -1656,7 +1656,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
// if start completed, will be added task to tracking queue
if (isKubernetesApp(application)) {
application.setRelease(ReleaseState.DONE.get());
- k8SFlinkTrackMonitor.doWatching(trackId);
+ flinkK8sWatcher.doWatching(trackId);
if
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
String domainName = settingService.getIngressModeDefault();
if (StringUtils.isNotBlank(domainName)) {
@@ -1755,7 +1755,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
// re-tracking flink job on kubernetes and logging exception
if (isKubernetesApp(application)) {
TrackId id = toTrackId(application);
- k8SFlinkTrackMonitor.doWatching(id);
+ flinkK8sWatcher.doWatching(id);
} else {
FlinkRESTAPIWatcher.unWatching(application.getId());
}
diff --git a/streampark-console/streampark-console-webapp/package.json
b/streampark-console/streampark-console-webapp/package.json
index 9d2d43554..8622c067f 100644
--- a/streampark-console/streampark-console-webapp/package.json
+++ b/streampark-console/streampark-console-webapp/package.json
@@ -49,7 +49,7 @@
"@vueuse/shared": "^9.6.0",
"@zxcvbn-ts/core": "^2.1.0",
"ant-design-vue": "^3.2.15",
- "axios": "^1.2.1",
+ "axios": "^1.6.5",
"crypto-js": "^4.1.1",
"dayjs": "^1.11.6",
"lodash-es": "^4.17.21",
diff --git
a/streampark-console/streampark-console-webapp/src/views/base/login/Login.vue
b/streampark-console/streampark-console-webapp/src/views/base/login/Login.vue
index 8daa7445d..2e0598e68 100644
---
a/streampark-console/streampark-console-webapp/src/views/base/login/Login.vue
+++
b/streampark-console/streampark-console-webapp/src/views/base/login/Login.vue
@@ -59,7 +59,7 @@
/>
</a>
<p class="text-light-100 pt-10px" style="border-top: 1px solid #dad7d7">
- Copyright © 2023 The Apache Software Foundation. Apache StreamPark,
StreamPark, and its
+ Copyright © 2022-{{ `${new Date().getFullYear()}` }} The Apache
Software Foundation. Apache StreamPark, StreamPark, and its
feather logo are trademarks of The Apache Software Foundation.
</p>
</footer>
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
index e9a3b06e1..4e3d808c4 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
@@ -21,7 +21,7 @@ import
org.apache.streampark.flink.kubernetes.enums.FlinkJobState
import
org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION,
SESSION}
import org.apache.streampark.flink.kubernetes.event.{BuildInEvent,
FlinkJobStateEvent, FlinkJobStatusChangeEvent}
import org.apache.streampark.flink.kubernetes.model._
-import org.apache.streampark.flink.kubernetes.watcher.{FlinkCheckpointWatcher,
FlinkJobStatusWatcher, FlinkK8sEventWatcher, FlinkMetricWatcher, FlinkWatcher}
+import org.apache.streampark.flink.kubernetes.watcher._
import com.google.common.eventbus.{AllowConcurrentEvents, Subscribe}
@@ -42,10 +42,10 @@ class DefaultFlinkK8sWatcher(conf: FlinkTrackConfig =
FlinkTrackConfig.defaultCo
}
// remote server tracking watcher
- val k8sEventWatcher = new FlinkK8sEventWatcher()
- val jobStatusWatcher = new FlinkJobStatusWatcher(conf.jobStatusWatcherConf)
- val metricsWatcher = new FlinkMetricWatcher(conf.metricWatcherConf)
- val checkpointWatcher = new FlinkCheckpointWatcher(conf.metricWatcherConf)
+ private val k8sEventWatcher = new FlinkK8sEventWatcher()
+ private val jobStatusWatcher = new
FlinkJobStatusWatcher(conf.jobStatusWatcherConf)
+ private val metricsWatcher = new FlinkMetricWatcher(conf.metricWatcherConf)
+ private val checkpointWatcher = new
FlinkCheckpointWatcher(conf.metricWatcherConf)
private[this] val allWatchers =
Array[FlinkWatcher](k8sEventWatcher, jobStatusWatcher, metricsWatcher,
checkpointWatcher)
@@ -118,7 +118,7 @@ class DefaultFlinkK8sWatcher(conf: FlinkTrackConfig =
FlinkTrackConfig.defaultCo
watchController.endpoints.get(trackId.toClusterKey)
/** Build-in Event Listener of K8sFlinkTrackMonitor. */
- class BuildInEventListener {
+ private class BuildInEventListener {
/**
* Watch the FlinkJobOperaEvent, then update relevant cache record and
trigger a new
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 6d731fe83..eec41ebd6 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -31,7 +31,6 @@ import com.google.common.io.Files
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.history.FsJobArchivist
import org.apache.hc.client5.http.fluent.Request
-import org.apache.hc.core5.util.Timeout
import org.json4s.{DefaultFormats, JNothing, JNull}
import org.json4s.JsonAST.JArray
import org.json4s.jackson.JsonMethods.parse
@@ -116,9 +115,13 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
case Some(jobState) =>
val trackId = id.copy(jobId = jobState.jobId)
val latest: JobStatusCV =
watchController.jobStatuses.get(trackId)
- if (
- latest == null || latest.jobState != jobState.jobState ||
latest.jobId != jobState.jobId
- ) {
+
+ val eventChanged = latest == null ||
+ latest.jobState != jobState.jobState ||
+ latest.jobId != jobState.jobId
+
+ if (eventChanged) {
+ logInfo(s"eventChanged.....$trackId")
// put job status to cache
watchController.jobStatuses.put(trackId, jobState)
// set jobId to trackIds
@@ -193,7 +196,6 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
pollAckTime = System.currentTimeMillis)
}
}
-
Some(jobState)
}
@@ -204,7 +206,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
* This method can be called directly from outside, without affecting the
current cachePool
* result.
*/
- protected[kubernetes] def touchSessionAllJob(
+ private def touchSessionAllJob(
@Nonnull clusterId: String,
@Nonnull namespace: String,
@Nonnull appId: Long,
@@ -219,8 +221,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
} else {
jobDetails.map {
d =>
- TrackId.onSession(namespace, clusterId, appId, d.jid, groupId) -> d
- .toJobStatusCV(pollEmitTime, System.currentTimeMillis)
+ TrackId.onSession(namespace, clusterId, appId, d.jid, groupId) ->
+ d.toJobStatusCV(pollEmitTime, System.currentTimeMillis)
}
}
}
@@ -238,7 +240,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
val namespace = trackId.namespace
val jobDetails = listJobsDetails(ClusterKey(APPLICATION, namespace,
clusterId))
if (jobDetails.isEmpty || jobDetails.get.jobs.isEmpty) {
- inferApplicationFlinkJobStateFromK8sEvent(trackId)
+ inferJobStateFromK8sEvent(trackId)
} else {
Some(jobDetails.get.jobs.head.toJobStatusCV(pollEmitTime,
System.currentTimeMillis))
}
@@ -284,7 +286,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
* Infer the current flink state from the last relevant k8s events. This
method is only used for
* application-mode job inference in case of a failed JM rest request.
*/
- private def inferApplicationFlinkJobStateFromK8sEvent(@Nonnull trackId:
TrackId)(implicit
+ private def inferJobStateFromK8sEvent(@Nonnull trackId: TrackId)(implicit
pollEmitTime: Long): Option[JobStatusCV] = {
// infer from k8s deployment and event
@@ -301,14 +303,13 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
trackId.clusterId
)
- val deployError = KubernetesDeploymentHelper.isDeploymentError(
- trackId.namespace,
- trackId.clusterId
- )
-
val isConnection = KubernetesDeploymentHelper.checkConnection()
if (deployExists) {
+ val deployError = KubernetesDeploymentHelper.isDeploymentError(
+ trackId.namespace,
+ trackId.clusterId
+ )
if (!deployError) {
logger.info("Task Enter the initialization process.")
FlinkJobState.K8S_INITIALIZING