wangyang0918 commented on a change in pull request #18119:
URL: https://github.com/apache/flink/pull/18119#discussion_r781848906



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -373,6 +375,62 @@ public KubernetesPod loadPodFromTemplateFile(File file) {
         return new KubernetesPod(this.internalClient.pods().load(file).get());
     }
 
+    @Override
+    public CompletableFuture<Void> updateServiceTargetPort(
+            KubernetesService.ServiceType serviceType,
+            String clusterId,
+            String portName,
+            int targetPort) {
+        LOG.debug("update {} target port value to {}", portName, targetPort);

Review comment:
       ```suggestion
           LOG.debug("Update {} target port to {}", portName, targetPort);
   ```

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
##########
@@ -486,6 +487,26 @@ public static String 
tryToGetPrettyPrintYaml(KubernetesResource kubernetesResour
         }
     }
 
+    /** Checks if hostNetwork is enabled. */
+    public static boolean isHostNetwork(Configuration configuration) {
+        return 
configuration.getBoolean(KubernetesConfigOptions.KUBERNETES_HOSTNETWORK_ENABLED);
+    }
+
+    /** Parse port from webUrl. */
+    public static Integer gerRestBindPort(String webUrl) {

Review comment:
       I believe this is duplicated with 
`YarnResourceManagerDriver#registerApplicationMaster`. Could you please 
deduplicate this by introducing a new method.
   
   After then, this change could be splitted into a separate `hotfix` commit 
before all the concrete changes. Adding some corresponding tests will be great.
   
   
https://flink.apache.org/contributing/code-style-and-quality-pull-requests.html#commit-naming-conventions

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
##########
@@ -452,6 +452,14 @@
                                     + "It will help to achieve faster 
recovery. "
                                     + "Notice that high availability should be 
enabled when starting standby JobManagers.");
 
+    public static final ConfigOption<Boolean> KUBERNETES_HOSTNETWORK_ENABLED =
+            key("kubernetes.hostnetwork.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to enable HostNetwork mode. "
+                                    + "The HostNetwork allows the pod could 
use the node network namespace instead of the individual pod network namespace. 
");

Review comment:
       ```suggestion
                                       + "The HostNetwork allows the pod could 
use the node network namespace instead of the individual pod network namespace. 
Please note that the JobManager service account should have the permission to 
update Kubernetes service.");
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
##########
@@ -263,6 +264,7 @@ private void runCluster(Configuration configuration, 
PluginManager pluginManager
                             new RpcMetricQueryServiceRetriever(
                                     
metricRegistry.getMetricQueryServiceRpcService()),
                             this);
+            configuration.setString(RestOptions.BIND_PORT, 
clusterComponent.getRestPort());

Review comment:
       I believe we do not need this line of change after introducing the 
`webInterfaceUrl` in `KubernetesResourceManagerDriver`.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
##########
@@ -201,6 +203,20 @@ KubernetesLeaderElector createLeaderElector(
      */
     KubernetesPod loadPodFromTemplateFile(File podTemplateFile);
 
+    /**
+     * Update the target ports of the given Kubernetes service.
+     *
+     * @param serviceType The ServiceType serviceType that needs to be modified
+     * @param portName The port name which to modified
+     * @param targetPort Real targetPort value

Review comment:
       ```suggestion
        * @param serviceType The ServiceType which needs to be updated
        * @param portName The port name which needs to be updated
        * @param targetPort The updated target port
   ```

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
##########
@@ -132,9 +132,8 @@ private Pod decoratePod(Pod pod) {
         for (File file : localLogFiles) {
             data.put(file.getName(), Files.toString(file, 
StandardCharsets.UTF_8));
         }
-
-        final Map<String, String> propertiesMap =
-                
getClusterSidePropertiesMap(kubernetesComponentConf.getFlinkConfiguration());
+        Configuration configuration = 
kubernetesComponentConf.getFlinkConfiguration();

Review comment:
       Unnecessary change.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
##########
@@ -1,3 +1,4 @@
+/*

Review comment:
       ```suggestion
   ```
   
   Unnecessary change.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
##########
@@ -104,6 +104,7 @@
     private ServerBootstrap bootstrap;
     private Channel serverChannel;
     private String restBaseUrl;
+    private String restPort;

Review comment:
       I believe we do not need the changes in class.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##########
@@ -240,6 +248,31 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
         
getResourceEventHandler().onPreviousAttemptWorkersRecovered(recoveredWorkers);
     }
 
+    private void updateKubernetesServiceTargetPortIfNecessary() throws 
Exception {
+        if (!KubernetesUtils.isHostNetwork(flinkConfig)) {
+            return;
+        }
+        flinkKubeClient.updateServiceTargetPort(
+                KubernetesService.ServiceType.REST_SERVICE,
+                clusterId,
+                Constants.REST_PORT_NAME,
+                KubernetesUtils.gerRestBindPort(webInterfaceUrl));
+        if 
(!HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) {
+            flinkKubeClient
+                    .updateServiceTargetPort(
+                            KubernetesService.ServiceType.INTERNAL_SERVICE,
+                            clusterId,
+                            Constants.BLOB_SERVER_PORT_NAME,
+                            
Integer.valueOf(flinkConfig.getString(BlobServerOptions.PORT)))

Review comment:
       ```suggestion
                               
Integer.parseInt(flinkConfig.getString(BlobServerOptions.PORT)))
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
##########
@@ -73,6 +73,8 @@
 
     private final DispatcherOperationCaches dispatcherOperationCaches;
 
+    private String restPort;

Review comment:
       I believe we do not need the changes in class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to