This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8f7ad848964c00bee490be0a4ea6d715426179d8 Author: wangyang0918 <danrtsey...@alibaba-inc.com> AuthorDate: Mon Dec 9 18:08:03 2019 +0800 [FLINK-15153][kubernetes] Service selector needs to contain jobmanager component label The jobmanager label needs to be added to service selector. Otherwise, it may select the wrong backend pods(taskmanager). --- .../flink/kubernetes/kubeclient/decorators/ServiceDecorator.java | 9 ++++++++- .../apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java | 3 +++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ServiceDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ServiceDecorator.java index 98c4634..f8c4493 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ServiceDecorator.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ServiceDecorator.java @@ -33,6 +33,7 @@ import io.fabric8.kubernetes.api.model.ServiceSpec; import java.util.ArrayList; import java.util.List; +import java.util.Map; /** * Setup services port. @@ -89,7 +90,13 @@ public class ServiceDecorator extends Decorator<Service, KubernetesService> { } spec.setPorts(servicePorts); - spec.setSelector(resource.getMetadata().getLabels()); + + final Map<String, String> labels = new LabelBuilder() + .withExist(resource.getMetadata().getLabels()) + .withJobManagerComponent() + .toLabels(); + + spec.setSelector(labels); resource.setSpec(spec); diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java index 17b52c7..08ebedf 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java @@ -106,6 +106,8 @@ public class Fabric8ClientTest extends KubernetesTestBase { assertEquals(KubernetesConfigOptions.ServiceExposedType.ClusterIP.toString(), service.getSpec().getType()); + // The selector labels should contain jobmanager component + labels.put(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_JOB_MANAGER); assertEquals(labels, service.getSpec().getSelector()); assertThat(service.getSpec().getPorts().stream().map(ServicePort::getPort).collect(Collectors.toList()), @@ -133,6 +135,7 @@ public class Fabric8ClientTest extends KubernetesTestBase { assertEquals(KubernetesConfigOptions.ServiceExposedType.LoadBalancer.toString(), service.getSpec().getType()); + labels.put(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_JOB_MANAGER); assertEquals(labels, service.getSpec().getSelector()); assertThat(service.getSpec().getPorts().stream().map(ServicePort::getPort).collect(Collectors.toList()),