This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 70975b25870 [FLINK-34429][flink-kubernetes] Setting annotations on
internal service.
70975b25870 is described below
commit 70975b258700f12cdb4e9352180be2f4213a6a08
Author: Barak Ben-Nathan <[email protected]>
AuthorDate: Sun Feb 18 14:46:11 2024 +0200
[FLINK-34429][flink-kubernetes] Setting annotations on internal service.
[FLINK-34429][docs] Updating configuration to include
kubernetes.internal-service.annotations key.
[FLINK-34429][flink-kubernetes] Kubernetes side parameters utility to
retrieve INTERNAL_SERVICE_ANNOTATIONS from configuration
[FLINK-34429][flink-kubernetes] Test for pulling Internal-Service
Annotations parameter (KubernetesJobManagerParameters)
[FLINK-34429][flink-kubernetes] Testing that internal-service is created
with configured annotations
---
.../generated/kubernetes_config_configuration.html | 6 ++++++
.../configuration/KubernetesConfigOptions.java | 8 ++++++++
.../parameters/KubernetesJobManagerParameters.java | 6 ++++++
.../kubeclient/services/HeadlessClusterIPService.java | 1 +
.../kubeclient/KubernetesJobManagerTestBase.java | 1 +
.../decorators/InternalServiceDecoratorTest.java | 2 ++
.../parameters/KubernetesJobManagerParametersTest.java | 16 +++++++++++++++-
7 files changed, 39 insertions(+), 1 deletion(-)
diff --git
a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
index bb45be260aa..49c5e96cec7 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
@@ -110,6 +110,12 @@
<td>Boolean</td>
<td>Whether to enable HostNetwork mode. The HostNetwork allows the
pod could use the node network namespace instead of the individual pod network
namespace. Please note that the JobManager service account should have the
permission to update Kubernetes service.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.internal-service.annotations</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Map</td>
+ <td>The user-specified annotations that are set to the internal
Service. The value should be in the form of a1:v1,a2:v2</td>
+ </tr>
<tr>
<td><h5>kubernetes.jobmanager.annotations</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
index 1e2e6115fc3..a2e2e7c9b05 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
@@ -377,6 +377,14 @@ public class KubernetesConfigOptions {
"The user-specified annotations that are set to
the rest Service. The value should be "
+ "in the form of a1:v1,a2:v2");
+ public static final ConfigOption<Map<String, String>>
INTERNAL_SERVICE_ANNOTATIONS =
+ key("kubernetes.internal-service.annotations")
+ .mapType()
+ .noDefaultValue()
+ .withDescription(
+ "The user-specified annotations that are set to
the internal Service. The value should be "
+ + "in the form of a1:v1,a2:v2");
+
/**
* Defines the configuration key of that external resource in Kubernetes.
This is used as a
* suffix in an actual config.
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
index 845f930d18d..8fc09caa58a 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
@@ -118,6 +118,12 @@ public class KubernetesJobManagerParameters extends
AbstractKubernetesParameters
.orElse(Collections.emptyMap());
}
+ public Map<String, String> getInternalServiceAnnotations() {
+ return flinkConfig
+
.getOptional(KubernetesConfigOptions.INTERNAL_SERVICE_ANNOTATIONS)
+ .orElse(Collections.emptyMap());
+ }
+
public int getJobManagerMemoryMB() {
return clusterSpecification.getMasterMemoryMB();
}
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java
index ef932ff03da..675ffb87e1c 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java
@@ -50,6 +50,7 @@ public class HeadlessClusterIPService extends
ClusterIPService {
.withNewMetadata()
.withName(serviceName)
.withLabels(kubernetesJobManagerParameters.getCommonLabels())
+
.withAnnotations(kubernetesJobManagerParameters.getInternalServiceAnnotations())
.endMetadata()
.withNewSpec()
.withClusterIP(HEADLESS_CLUSTER_IP)
diff --git
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java
index f0cec0b391d..eba124773fc 100644
---
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java
+++
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java
@@ -63,6 +63,7 @@ public class KubernetesJobManagerTestBase extends
KubernetesPodTestBase {
ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX + k, v));
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_LABELS,
userLabels);
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS,
userAnnotations);
+
this.flinkConfig.set(KubernetesConfigOptions.INTERNAL_SERVICE_ANNOTATIONS,
userAnnotations);
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_NODE_SELECTOR,
nodeSelector);
this.flinkConfig.set(
JobManagerOptions.TOTAL_PROCESS_MEMORY,
MemorySize.ofMebiBytes(JOB_MANAGER_MEMORY));
diff --git
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java
index 46aca881330..92f91648290 100644
---
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java
+++
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java
@@ -69,6 +69,8 @@ class InternalServiceDecoratorTest extends
KubernetesJobManagerTestBase {
final Map<String, String> expectedLabels = getCommonLabels();
assertThat(internalService.getMetadata().getLabels()).isEqualTo(expectedLabels);
+
assertThat(internalService.getMetadata().getAnnotations()).isEqualTo(userAnnotations);
+
assertThat(internalService.getSpec().getType()).isNull();
assertThat(internalService.getSpec().getClusterIP()).isEqualTo("None");
diff --git
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java
index 67a6fd51fbf..8883d0b94ee 100644
---
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java
+++
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java
@@ -96,7 +96,7 @@ class KubernetesJobManagerParametersTest extends
KubernetesTestBase {
}
@Test
- void testGetServiceAnnotations() {
+ void testGetRestServiceAnnotations() {
final Map<String, String> expectedAnnotations = new HashMap<>();
expectedAnnotations.put("a1", "v1");
expectedAnnotations.put("a2", "v2");
@@ -109,6 +109,20 @@ class KubernetesJobManagerParametersTest extends
KubernetesTestBase {
assertThat(resultAnnotations).isEqualTo(expectedAnnotations);
}
+ @Test
+ void testGetInternalServiceAnnotations() {
+ final Map<String, String> expectedAnnotations = new HashMap<>();
+ expectedAnnotations.put("a1", "v1");
+ expectedAnnotations.put("a2", "v2");
+
+ flinkConfig.set(KubernetesConfigOptions.INTERNAL_SERVICE_ANNOTATIONS,
expectedAnnotations);
+
+ final Map<String, String> resultAnnotations =
+ kubernetesJobManagerParameters.getInternalServiceAnnotations();
+
+ assertThat(resultAnnotations).isEqualTo(expectedAnnotations);
+ }
+
@Test
void testGetJobManagerMemoryMB() {
assertThat(kubernetesJobManagerParameters.getJobManagerMemoryMB())