autophagy commented on a change in pull request #16720:
URL: https://github.com/apache/flink/pull/16720#discussion_r685952953



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
##########
@@ -59,6 +60,25 @@
                             "The exposed type of the rest service. "
                                     + "The exposed rest service could be used 
to access the Flink’s Web UI and REST endpoint.");
 
+    public static final ConfigOption<NodePortAddressType>
+            REST_SERVICE_EXPOSED_NODE_PORT_ADDRESS_TYPE =
+                    
key("kubernetes.rest-service.exposed.node-port-address-type")
+                            .enumType(NodePortAddressType.class)
+                            .defaultValue(NodePortAddressType.ExternalIP)
+                            .withDescription(
+                                    Description.builder()
+                                            .text(
+                                                    "The user-specified %s 
that is used for filtering node IPs when constructing %s connection string. 
This option is only considered when '%s' is set to '%s'.",

Review comment:
       Tiny nit: I think "filtering node IPs when constructing **a** %s 
connection string" would fit better grammatically.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -441,8 +435,26 @@ private int getRestPortFromExternalService(Service 
externalService) {
                 address = loadBalancer.getIngress().get(0).getHostname();
             }
         } else {
-            // Use node port
-            address = this.internalClient.getMasterUrl().getHost();
+            // Use node port. Node port is accessible on any node within 
kubernetes cluster. We'll
+            // only consider IPs with the configured address type.
+            address =
+                    internalClient.nodes().list().getItems().stream()
+                            .flatMap(node -> 
node.getStatus().getAddresses().stream())
+                            .filter(
+                                    nodeAddress ->
+                                            nodePortAddressType
+                                                    .name()
+                                                    
.equals(nodeAddress.getType()))
+                            .map(NodeAddress::getAddress)
+                            .filter(externalIp -> !externalIp.isEmpty())

Review comment:
       Small nit: Would it make more sense to name this `address`, rather than 
`externalIp`, since I assume the map above return an address? As I understand, 
the address we grab here might be external or internal depending on what the 
expoed node port address type is set to?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to