zhengcanbin commented on a change in pull request #11715: [FLINK-16598][k8s] 
Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#discussion_r409319298
 
 

 ##########
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
 ##########
 @@ -264,17 +263,32 @@ private void setOwnerReference(Deployment deployment, 
List<HasMetadata> resource
        }
 
        /**
-        * To get nodePort of configured ports.
+        * Get rest port from the external Service.
         */
-       private int getServiceNodePort(Service service, ConfigOption<Integer> 
configPort) {
-               final int port = this.flinkConfig.getInteger(configPort);
-               if (service.getSpec() != null && service.getSpec().getPorts() 
!= null) {
-                       for (ServicePort p : service.getSpec().getPorts()) {
-                               if (p.getPort() == port) {
-                                       return p.getNodePort();
-                               }
-                       }
+       private int getRestPortFromExternalService(Service externalService) {
+               final List<ServicePort> servicePortCandidates = 
externalService.getSpec().getPorts()
+                       .stream()
+                       .filter(x -> 
x.getName().equals(Constants.REST_PORT_NAME))
+                       .collect(Collectors.toList());
+
+               if (servicePortCandidates.isEmpty()) {
+                       throw new RuntimeException("Failed to find port \"" + 
Constants.REST_PORT_NAME + "\" in Service \"" +
+                               
KubernetesUtils.getRestServiceName(this.clusterId) + "\"");
+               }
+
+               final ServicePort externalServicePort = 
servicePortCandidates.get(0);
+
+               final KubernetesConfigOptions.ServiceExposedType 
externalServiceType =
+                       
KubernetesConfigOptions.ServiceExposedType.valueOf(externalService.getSpec().getType());
+
+               switch (externalServiceType) {
+                       case ClusterIP:
+                       case LoadBalancer:
 
 Review comment:
   > Since `LoadBalancer` is a superset of `NodePort`, so when the service 
exposed type is `LoadBalancer`, K8s will always create `NodePort`.
   
   Yes, you are right!
   
   > In my opinion, returning a node port url when the `LoadBalancer` is not 
available is a reasonable solution. And i believe that many users are running 
in unmanaged K8s cluster without load balancer. Now they have to explicitly set 
the `kubernetes.rest-service.exposed.type=NodePort`.
   
   I tend to think differently. If people use the LB Service, then the LB 
should work as expected, otherwise, we should throw Exceptions when retrieving 
the Endpoint to tell the users that the LB is unready or abnormal. 
   
   Imagine what could happen if we fall back to the NodePort in such a 
situation, the jobs may never be submitted to the cluster since the Kubernetes 
node IP/address are not accessible in the submission client due to network 
security policy, which could be the usual cases, especially on Cloud. 
   
   Therefore, such toleration or fallback could confuse the users and they may 
need to dive into the code to get what really happened behind.
   
   If the users run their workload on K8s clusters without LB, then it is 
reasonable for them to explicitly set the 
`kubernetes.rest-service.exposed.type=NodePort`.
   
   Another concern, maybe we'd better change the default value of the  
`kubernetes.rest-service.exposed.type` to NodePort or ClusterIP.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to