This is an automated email from the ASF dual-hosted git repository.
wangyang0918 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 48acc1c1383 [FLINK-28353][k8s] Exclude unschedulable nodes when
generating the node port
48acc1c1383 is described below
commit 48acc1c138309df2f72533777d463fd2225e6fba
Author: suxinglee <[email protected]>
AuthorDate: Fri Jul 1 22:23:53 2022 +0800
[FLINK-28353][k8s] Exclude unschedulable nodes when generating the node port
This closes #20134.
---
.../kubernetes/kubeclient/services/LoadBalancerService.java | 4 ++++
.../kubernetes/kubeclient/services/NodePortService.java | 4 ++++
.../apache/flink/kubernetes/KubernetesClientTestBase.java | 12 +++++++++++-
.../kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java | 13 +++++++++++--
4 files changed, 30 insertions(+), 3 deletions(-)
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/LoadBalancerService.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/LoadBalancerService.java
index ccc1fa9a0c6..8c8a4aa2a22 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/LoadBalancerService.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/LoadBalancerService.java
@@ -101,6 +101,10 @@ public class LoadBalancerService extends ServiceType {
// only consider IPs with the configured address type.
address =
internalClient.nodes().list().getItems().stream()
+ .filter(
+ node ->
+ node.getSpec().getUnschedulable()
== null
+ ||
!node.getSpec().getUnschedulable())
.flatMap(node ->
node.getStatus().getAddresses().stream())
.filter(
nodeAddress ->
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/NodePortService.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/NodePortService.java
index 326eb663ea1..ac020f5fbc9 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/NodePortService.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/NodePortService.java
@@ -60,6 +60,10 @@ public class NodePortService extends ServiceType {
// only consider IPs with the configured address type.
address =
internalClient.nodes().list().getItems().stream()
+ .filter(
+ node ->
+ node.getSpec().getUnschedulable() ==
null
+ ||
!node.getSpec().getUnschedulable())
.flatMap(node ->
node.getStatus().getAddresses().stream())
.filter(
nodeAddress ->
diff --git
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java
index 80ed0504c80..a9d926c1924 100644
---
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java
+++
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java
@@ -31,6 +31,7 @@ import io.fabric8.kubernetes.api.model.Node;
import io.fabric8.kubernetes.api.model.NodeAddressBuilder;
import io.fabric8.kubernetes.api.model.NodeBuilder;
import io.fabric8.kubernetes.api.model.NodeListBuilder;
+import io.fabric8.kubernetes.api.model.NodeSpecBuilder;
import io.fabric8.kubernetes.api.model.NodeStatusBuilder;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServiceBuilder;
@@ -43,6 +44,7 @@ import io.fabric8.mockwebserver.dsl.HttpMethodable;
import io.fabric8.mockwebserver.dsl.MockServerExpectation;
import io.fabric8.mockwebserver.dsl.ReturnOrWebsocketable;
import io.fabric8.mockwebserver.dsl.TimesOnceableOrHttpHeaderable;
+import org.apache.commons.lang3.StringUtils;
import javax.annotation.Nullable;
@@ -66,9 +68,17 @@ public class KubernetesClientTestBase extends
KubernetesTestBase {
for (String address : addresses) {
final String[] parts = address.split(":");
Preconditions.checkState(
- parts.length == 2, "Address should be in format
\"<type>:<ip>\".");
+ parts.length == 3,
+ "Address should be in format
\"<type>:<ip>:<unschedulable>\".");
nodes.add(
new NodeBuilder()
+ .withSpec(
+ new NodeSpecBuilder()
+ .withUnschedulable(
+
StringUtils.isBlank(parts[2])
+ ? null
+ :
Boolean.parseBoolean(parts[2]))
+ .build())
.withStatus(
new NodeStatusBuilder()
.withAddresses(
diff --git
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
index de95997a996..322a8af0415 100644
---
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
+++
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
@@ -319,9 +319,15 @@ public class Fabric8FlinkKubeClientTest extends
KubernetesClientTestBase {
flinkConfig.set(
KubernetesConfigOptions.REST_SERVICE_EXPOSED_NODE_PORT_ADDRESS_TYPE,
addressType);
final List<String> internalAddresses =
- Arrays.asList("InternalIP:10.0.0.1", "InternalIP:10.0.0.2",
"InternalIP:10.0.0.3");
+ Arrays.asList(
+ "InternalIP:10.0.0.1:true",
+ "InternalIP:10.0.0.2:false",
+ "InternalIP:10.0.0.3: ");
final List<String> externalAddresses =
- Arrays.asList("ExternalIP:7.7.7.7", "ExternalIP:8.8.8.8",
"ExternalIP:9.9.9.9");
+ Arrays.asList(
+ "ExternalIP:7.7.7.7:true",
+ "ExternalIP:8.8.8.8:false",
+ "ExternalIP:9.9.9.9: ");
final List<String> addresses = new ArrayList<>();
addresses.addAll(internalAddresses);
addresses.addAll(externalAddresses);
@@ -339,12 +345,14 @@ public class Fabric8FlinkKubeClientTest extends
KubernetesClientTestBase {
case InternalIP:
expectedIps =
internalAddresses.stream()
+ .filter(s ->
!"true".equals(s.split(":")[2]))
.map(s -> s.split(":")[1])
.collect(Collectors.toList());
break;
case ExternalIP:
expectedIps =
externalAddresses.stream()
+ .filter(s ->
!"true".equals(s.split(":")[2]))
.map(s -> s.split(":")[1])
.collect(Collectors.toList());
break;
@@ -352,6 +360,7 @@ public class Fabric8FlinkKubeClientTest extends
KubernetesClientTestBase {
throw new IllegalArgumentException(
String.format("Unexpected address type %s.",
addressType));
}
+ assertThat(expectedIps.size()).isEqualTo(2);
assertThat(resultEndpoint.get().getAddress()).isIn(expectedIps);
assertThat(resultEndpoint.get().getPort()).isEqualTo(NODE_PORT);
}