This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 70975b25870 [FLINK-34429][flink-kubernetes] Setting annotations on 
internal service.
70975b25870 is described below

commit 70975b258700f12cdb4e9352180be2f4213a6a08
Author: Barak Ben-Nathan <[email protected]>
AuthorDate: Sun Feb 18 14:46:11 2024 +0200

    [FLINK-34429][flink-kubernetes] Setting annotations on internal service.
    
    [FLINK-34429][docs] Updating configuration to include 
kubernetes.internal-service.annotations key.
    
    [FLINK-34429][flink-kubernetes] Kubernetes side parameters utility to 
retrieve INTERNAL_SERVICE_ANNOTATIONS from configuration
    
    [FLINK-34429][flink-kubernetes] Test for pulling Internal-Service 
Annotations parameter (KubernetesJobManagerParameters)
    
    [FLINK-34429][flink-kubernetes] Testing that internal-service is created 
with configured annotations
---
 .../generated/kubernetes_config_configuration.html       |  6 ++++++
 .../configuration/KubernetesConfigOptions.java           |  8 ++++++++
 .../parameters/KubernetesJobManagerParameters.java       |  6 ++++++
 .../kubeclient/services/HeadlessClusterIPService.java    |  1 +
 .../kubeclient/KubernetesJobManagerTestBase.java         |  1 +
 .../decorators/InternalServiceDecoratorTest.java         |  2 ++
 .../parameters/KubernetesJobManagerParametersTest.java   | 16 +++++++++++++++-
 7 files changed, 39 insertions(+), 1 deletion(-)

diff --git 
a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html 
b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
index bb45be260aa..49c5e96cec7 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
@@ -110,6 +110,12 @@
             <td>Boolean</td>
             <td>Whether to enable HostNetwork mode. The HostNetwork allows the 
pod could use the node network namespace instead of the individual pod network 
namespace. Please note that the JobManager service account should have the 
permission to update Kubernetes service.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.internal-service.annotations</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Map</td>
+            <td>The user-specified annotations that are set to the internal 
Service. The value should be in the form of a1:v1,a2:v2</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.jobmanager.annotations</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
index 1e2e6115fc3..a2e2e7c9b05 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
@@ -377,6 +377,14 @@ public class KubernetesConfigOptions {
                             "The user-specified annotations that are set to 
the rest Service. The value should be "
                                     + "in the form of a1:v1,a2:v2");
 
+    public static final ConfigOption<Map<String, String>> 
INTERNAL_SERVICE_ANNOTATIONS =
+            key("kubernetes.internal-service.annotations")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The user-specified annotations that are set to 
the internal Service. The value should be "
+                                    + "in the form of a1:v1,a2:v2");
+
     /**
      * Defines the configuration key of that external resource in Kubernetes. 
This is used as a
      * suffix in an actual config.
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
index 845f930d18d..8fc09caa58a 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
@@ -118,6 +118,12 @@ public class KubernetesJobManagerParameters extends 
AbstractKubernetesParameters
                 .orElse(Collections.emptyMap());
     }
 
+    public Map<String, String> getInternalServiceAnnotations() {
+        return flinkConfig
+                
.getOptional(KubernetesConfigOptions.INTERNAL_SERVICE_ANNOTATIONS)
+                .orElse(Collections.emptyMap());
+    }
+
     public int getJobManagerMemoryMB() {
         return clusterSpecification.getMasterMemoryMB();
     }
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java
index ef932ff03da..675ffb87e1c 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java
@@ -50,6 +50,7 @@ public class HeadlessClusterIPService extends 
ClusterIPService {
                 .withNewMetadata()
                 .withName(serviceName)
                 .withLabels(kubernetesJobManagerParameters.getCommonLabels())
+                
.withAnnotations(kubernetesJobManagerParameters.getInternalServiceAnnotations())
                 .endMetadata()
                 .withNewSpec()
                 .withClusterIP(HEADLESS_CLUSTER_IP)
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java
index f0cec0b391d..eba124773fc 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java
@@ -63,6 +63,7 @@ public class KubernetesJobManagerTestBase extends 
KubernetesPodTestBase {
                                 
ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX + k, v));
         this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_LABELS, 
userLabels);
         this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS, 
userAnnotations);
+        
this.flinkConfig.set(KubernetesConfigOptions.INTERNAL_SERVICE_ANNOTATIONS, 
userAnnotations);
         
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_NODE_SELECTOR, 
nodeSelector);
         this.flinkConfig.set(
                 JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(JOB_MANAGER_MEMORY));
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java
index 46aca881330..92f91648290 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java
@@ -69,6 +69,8 @@ class InternalServiceDecoratorTest extends 
KubernetesJobManagerTestBase {
         final Map<String, String> expectedLabels = getCommonLabels();
         
assertThat(internalService.getMetadata().getLabels()).isEqualTo(expectedLabels);
 
+        
assertThat(internalService.getMetadata().getAnnotations()).isEqualTo(userAnnotations);
+
         assertThat(internalService.getSpec().getType()).isNull();
         assertThat(internalService.getSpec().getClusterIP()).isEqualTo("None");
 
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java
index 67a6fd51fbf..8883d0b94ee 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java
@@ -96,7 +96,7 @@ class KubernetesJobManagerParametersTest extends 
KubernetesTestBase {
     }
 
     @Test
-    void testGetServiceAnnotations() {
+    void testGetRestServiceAnnotations() {
         final Map<String, String> expectedAnnotations = new HashMap<>();
         expectedAnnotations.put("a1", "v1");
         expectedAnnotations.put("a2", "v2");
@@ -109,6 +109,20 @@ class KubernetesJobManagerParametersTest extends 
KubernetesTestBase {
         assertThat(resultAnnotations).isEqualTo(expectedAnnotations);
     }
 
+    @Test
+    void testGetInternalServiceAnnotations() {
+        final Map<String, String> expectedAnnotations = new HashMap<>();
+        expectedAnnotations.put("a1", "v1");
+        expectedAnnotations.put("a2", "v2");
+
+        flinkConfig.set(KubernetesConfigOptions.INTERNAL_SERVICE_ANNOTATIONS, 
expectedAnnotations);
+
+        final Map<String, String> resultAnnotations =
+                kubernetesJobManagerParameters.getInternalServiceAnnotations();
+
+        assertThat(resultAnnotations).isEqualTo(expectedAnnotations);
+    }
+
     @Test
     void testGetJobManagerMemoryMB() {
         assertThat(kubernetesJobManagerParameters.getJobManagerMemoryMB())

Reply via email to