This is an automated email from the ASF dual-hosted git repository.

wangyang0918 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 48acc1c1383 [FLINK-28353][k8s] Exclude unschedulable nodes when 
generating the node port
48acc1c1383 is described below

commit 48acc1c138309df2f72533777d463fd2225e6fba
Author: suxinglee <[email protected]>
AuthorDate: Fri Jul 1 22:23:53 2022 +0800

    [FLINK-28353][k8s] Exclude unschedulable nodes when generating the node port
    
    This closes #20134.
---
 .../kubernetes/kubeclient/services/LoadBalancerService.java |  4 ++++
 .../kubernetes/kubeclient/services/NodePortService.java     |  4 ++++
 .../apache/flink/kubernetes/KubernetesClientTestBase.java   | 12 +++++++++++-
 .../kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java   | 13 +++++++++++--
 4 files changed, 30 insertions(+), 3 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/LoadBalancerService.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/LoadBalancerService.java
index ccc1fa9a0c6..8c8a4aa2a22 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/LoadBalancerService.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/LoadBalancerService.java
@@ -101,6 +101,10 @@ public class LoadBalancerService extends ServiceType {
             // only consider IPs with the configured address type.
             address =
                     internalClient.nodes().list().getItems().stream()
+                            .filter(
+                                    node ->
+                                            node.getSpec().getUnschedulable() 
== null
+                                                    || 
!node.getSpec().getUnschedulable())
                             .flatMap(node -> 
node.getStatus().getAddresses().stream())
                             .filter(
                                     nodeAddress ->
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/NodePortService.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/NodePortService.java
index 326eb663ea1..ac020f5fbc9 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/NodePortService.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/NodePortService.java
@@ -60,6 +60,10 @@ public class NodePortService extends ServiceType {
         // only consider IPs with the configured address type.
         address =
                 internalClient.nodes().list().getItems().stream()
+                        .filter(
+                                node ->
+                                        node.getSpec().getUnschedulable() == 
null
+                                                || 
!node.getSpec().getUnschedulable())
                         .flatMap(node -> 
node.getStatus().getAddresses().stream())
                         .filter(
                                 nodeAddress ->
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java
index 80ed0504c80..a9d926c1924 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java
@@ -31,6 +31,7 @@ import io.fabric8.kubernetes.api.model.Node;
 import io.fabric8.kubernetes.api.model.NodeAddressBuilder;
 import io.fabric8.kubernetes.api.model.NodeBuilder;
 import io.fabric8.kubernetes.api.model.NodeListBuilder;
+import io.fabric8.kubernetes.api.model.NodeSpecBuilder;
 import io.fabric8.kubernetes.api.model.NodeStatusBuilder;
 import io.fabric8.kubernetes.api.model.Service;
 import io.fabric8.kubernetes.api.model.ServiceBuilder;
@@ -43,6 +44,7 @@ import io.fabric8.mockwebserver.dsl.HttpMethodable;
 import io.fabric8.mockwebserver.dsl.MockServerExpectation;
 import io.fabric8.mockwebserver.dsl.ReturnOrWebsocketable;
 import io.fabric8.mockwebserver.dsl.TimesOnceableOrHttpHeaderable;
+import org.apache.commons.lang3.StringUtils;
 
 import javax.annotation.Nullable;
 
@@ -66,9 +68,17 @@ public class KubernetesClientTestBase extends 
KubernetesTestBase {
         for (String address : addresses) {
             final String[] parts = address.split(":");
             Preconditions.checkState(
-                    parts.length == 2, "Address should be in format 
\"<type>:<ip>\".");
+                    parts.length == 3,
+                    "Address should be in format 
\"<type>:<ip>:<unschedulable>\".");
             nodes.add(
                     new NodeBuilder()
+                            .withSpec(
+                                    new NodeSpecBuilder()
+                                            .withUnschedulable(
+                                                    
StringUtils.isBlank(parts[2])
+                                                            ? null
+                                                            : 
Boolean.parseBoolean(parts[2]))
+                                            .build())
                             .withStatus(
                                     new NodeStatusBuilder()
                                             .withAddresses(
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
index de95997a996..322a8af0415 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
@@ -319,9 +319,15 @@ public class Fabric8FlinkKubeClientTest extends 
KubernetesClientTestBase {
         flinkConfig.set(
                 
KubernetesConfigOptions.REST_SERVICE_EXPOSED_NODE_PORT_ADDRESS_TYPE, 
addressType);
         final List<String> internalAddresses =
-                Arrays.asList("InternalIP:10.0.0.1", "InternalIP:10.0.0.2", 
"InternalIP:10.0.0.3");
+                Arrays.asList(
+                        "InternalIP:10.0.0.1:true",
+                        "InternalIP:10.0.0.2:false",
+                        "InternalIP:10.0.0.3: ");
         final List<String> externalAddresses =
-                Arrays.asList("ExternalIP:7.7.7.7", "ExternalIP:8.8.8.8", 
"ExternalIP:9.9.9.9");
+                Arrays.asList(
+                        "ExternalIP:7.7.7.7:true",
+                        "ExternalIP:8.8.8.8:false",
+                        "ExternalIP:9.9.9.9: ");
         final List<String> addresses = new ArrayList<>();
         addresses.addAll(internalAddresses);
         addresses.addAll(externalAddresses);
@@ -339,12 +345,14 @@ public class Fabric8FlinkKubeClientTest extends 
KubernetesClientTestBase {
                 case InternalIP:
                     expectedIps =
                             internalAddresses.stream()
+                                    .filter(s -> 
!"true".equals(s.split(":")[2]))
                                     .map(s -> s.split(":")[1])
                                     .collect(Collectors.toList());
                     break;
                 case ExternalIP:
                     expectedIps =
                             externalAddresses.stream()
+                                    .filter(s -> 
!"true".equals(s.split(":")[2]))
                                     .map(s -> s.split(":")[1])
                                     .collect(Collectors.toList());
                     break;
@@ -352,6 +360,7 @@ public class Fabric8FlinkKubeClientTest extends 
KubernetesClientTestBase {
                     throw new IllegalArgumentException(
                             String.format("Unexpected address type %s.", 
addressType));
             }
+            assertThat(expectedIps.size()).isEqualTo(2);
             assertThat(resultEndpoint.get().getAddress()).isIn(expectedIps);
             assertThat(resultEndpoint.get().getPort()).isEqualTo(NODE_PORT);
         }

Reply via email to