This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new 076578cf3 Add a jobId label to the pods deployed in k8s mode. (#4063)
076578cf3 is described below
commit 076578cf316e98204823a23635e07cac40298f09
Author: Darcy <[email protected]>
AuthorDate: Sat Sep 14 21:17:00 2024 +0800
Add a jobId label to the pods deployed in k8s mode. (#4063)
* feature: Add a jobId label to the pods deployed on k8s to facilitate
subsequent monitoring and task viewing.
* Revert "feature: Add a jobId label to the pods deployed on k8s to
facilitate subsequent monitoring and task viewing."
* rollback FlinkClientTrait.scala and add code to
KubernetesNativeClientTrait.scala
---
.../client/trait/KubernetesNativeClientTrait.scala | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index 99c3a43fd..72ba662e3 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -32,6 +32,8 @@ import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.Service
import javax.annotation.Nonnull
+import java.util.{HashMap => JavaHashMap, Map => JavaMap}
+
import scala.language.postfixOps
/** kubernetes native mode submit */
@@ -70,6 +72,10 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait {
flinkConfig.removeConfig(KubernetesConfigOptions.NAMESPACE)
}
+ // add pod labels, mainly to facilitate the management of k8s resources
+ addPodLabels(flinkConfig, KubernetesConfigOptions.JOB_MANAGER_LABELS,
submitRequest)
+ addPodLabels(flinkConfig, KubernetesConfigOptions.TASK_MANAGER_LABELS,
submitRequest)
+
logInfo(s"""
|------------------------------------------------------------------
|Effective submit configuration: $flinkConfig
@@ -182,4 +188,15 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait
{
null
}
+ private def addPodLabels(
+ flinkConfig: Configuration,
+ opt: ConfigOption[JavaMap[String, String]],
+ submitRequest: SubmitRequest): Unit = {
+ val labels = flinkConfig
+ .getOptional[JavaMap[String, String]](opt)
+ .orElse(new JavaHashMap[String, String]())
+ labels.put("jobId", submitRequest.jobId)
+ flinkConfig.safeSet(opt, labels)
+ }
+
}