This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
     new 076578cf3 Add a jobId label to the pods deployed in k8s mode. (#4063)
076578cf3 is described below

commit 076578cf316e98204823a23635e07cac40298f09
Author: Darcy <[email protected]>
AuthorDate: Sat Sep 14 21:17:00 2024 +0800

    Add a jobId label to the pods deployed in k8s mode. (#4063)
    
    * feature: Add a jobId label to the pods deployed on k8s to facilitate 
subsequent monitoring and task viewing.
    
    * Revert "feature: Add a jobId label to the pods deployed on k8s to 
facilitate subsequent monitoring and task viewing."
    
    
    * rollback FlinkClientTrait.scala and add code to 
KubernetesNativeClientTrait.scala
---
 .../client/trait/KubernetesNativeClientTrait.scala      | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index 99c3a43fd..72ba662e3 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -32,6 +32,8 @@ import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.Service
 
 import javax.annotation.Nonnull
 
+import java.util.{HashMap => JavaHashMap, Map => JavaMap}
+
 import scala.language.postfixOps
 
 /** kubernetes native mode submit */
@@ -70,6 +72,10 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait {
       flinkConfig.removeConfig(KubernetesConfigOptions.NAMESPACE)
     }
 
+    // add pod labels, mainly to facilitate the management of k8s resources
+    addPodLabels(flinkConfig, KubernetesConfigOptions.JOB_MANAGER_LABELS, 
submitRequest)
+    addPodLabels(flinkConfig, KubernetesConfigOptions.TASK_MANAGER_LABELS, 
submitRequest)
+
     logInfo(s"""
                
|------------------------------------------------------------------
                |Effective submit configuration: $flinkConfig
@@ -182,4 +188,15 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait 
{
     null
   }
 
+  private def addPodLabels(
+      flinkConfig: Configuration,
+      opt: ConfigOption[JavaMap[String, String]],
+      submitRequest: SubmitRequest): Unit = {
+    val labels = flinkConfig
+      .getOptional[JavaMap[String, String]](opt)
+      .orElse(new JavaHashMap[String, String]())
+    labels.put("jobId", submitRequest.jobId)
+    flinkConfig.safeSet(opt, labels)
+  }
+
 }

Reply via email to