wangyang0918 commented on a change in pull request #18119:
URL: https://github.com/apache/flink/pull/18119#discussion_r781848906
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -373,6 +375,62 @@ public KubernetesPod loadPodFromTemplateFile(File file) {
return new KubernetesPod(this.internalClient.pods().load(file).get());
}
+ @Override
+ public CompletableFuture<Void> updateServiceTargetPort(
+ KubernetesService.ServiceType serviceType,
+ String clusterId,
+ String portName,
+ int targetPort) {
+ LOG.debug("update {} target port value to {}", portName, targetPort);
Review comment:
```suggestion
LOG.debug("Update {} target port to {}", portName, targetPort);
```
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
##########
@@ -486,6 +487,26 @@ public static String
tryToGetPrettyPrintYaml(KubernetesResource kubernetesResour
}
}
+ /** Checks if hostNetwork is enabled. */
+ public static boolean isHostNetwork(Configuration configuration) {
+ return
configuration.getBoolean(KubernetesConfigOptions.KUBERNETES_HOSTNETWORK_ENABLED);
+ }
+
+ /** Parse port from webUrl. */
+ public static Integer gerRestBindPort(String webUrl) {
Review comment:
I believe this is duplicated with
`YarnResourceManagerDriver#registerApplicationMaster`. Could you please
deduplicate this by introducing a new method.
After then, this change could be splitted into a separate `hotfix` commit
before all the concrete changes. Adding some corresponding tests will be great.
https://flink.apache.org/contributing/code-style-and-quality-pull-requests.html#commit-naming-conventions
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
##########
@@ -452,6 +452,14 @@
+ "It will help to achieve faster
recovery. "
+ "Notice that high availability should be
enabled when starting standby JobManagers.");
+ public static final ConfigOption<Boolean> KUBERNETES_HOSTNETWORK_ENABLED =
+ key("kubernetes.hostnetwork.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to enable HostNetwork mode. "
+ + "The HostNetwork allows the pod could
use the node network namespace instead of the individual pod network namespace.
");
Review comment:
```suggestion
+ "The HostNetwork allows the pod could
use the node network namespace instead of the individual pod network namespace.
Please note that the JobManager service account should have the permission to
update Kubernetes service.");
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
##########
@@ -263,6 +264,7 @@ private void runCluster(Configuration configuration,
PluginManager pluginManager
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService()),
this);
+ configuration.setString(RestOptions.BIND_PORT,
clusterComponent.getRestPort());
Review comment:
I believe we do not need this line of change after introducing the
`webInterfaceUrl` in `KubernetesResourceManagerDriver`.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
##########
@@ -201,6 +203,20 @@ KubernetesLeaderElector createLeaderElector(
*/
KubernetesPod loadPodFromTemplateFile(File podTemplateFile);
+ /**
+ * Update the target ports of the given Kubernetes service.
+ *
+ * @param serviceType The ServiceType serviceType that needs to be modified
+ * @param portName The port name which to modified
+ * @param targetPort Real targetPort value
Review comment:
```suggestion
* @param serviceType The ServiceType which needs to be updated
* @param portName The port name which needs to be updated
* @param targetPort The updated target port
```
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
##########
@@ -132,9 +132,8 @@ private Pod decoratePod(Pod pod) {
for (File file : localLogFiles) {
data.put(file.getName(), Files.toString(file,
StandardCharsets.UTF_8));
}
-
- final Map<String, String> propertiesMap =
-
getClusterSidePropertiesMap(kubernetesComponentConf.getFlinkConfiguration());
+ Configuration configuration =
kubernetesComponentConf.getFlinkConfiguration();
Review comment:
Unnecessary change.
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
##########
@@ -1,3 +1,4 @@
+/*
Review comment:
```suggestion
```
Unnecessary change.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
##########
@@ -104,6 +104,7 @@
private ServerBootstrap bootstrap;
private Channel serverChannel;
private String restBaseUrl;
+ private String restPort;
Review comment:
I believe we do not need the changes in class.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##########
@@ -240,6 +248,31 @@ private void recoverWorkerNodesFromPreviousAttempts()
throws ResourceManagerExce
getResourceEventHandler().onPreviousAttemptWorkersRecovered(recoveredWorkers);
}
+ private void updateKubernetesServiceTargetPortIfNecessary() throws
Exception {
+ if (!KubernetesUtils.isHostNetwork(flinkConfig)) {
+ return;
+ }
+ flinkKubeClient.updateServiceTargetPort(
+ KubernetesService.ServiceType.REST_SERVICE,
+ clusterId,
+ Constants.REST_PORT_NAME,
+ KubernetesUtils.gerRestBindPort(webInterfaceUrl));
+ if
(!HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) {
+ flinkKubeClient
+ .updateServiceTargetPort(
+ KubernetesService.ServiceType.INTERNAL_SERVICE,
+ clusterId,
+ Constants.BLOB_SERVER_PORT_NAME,
+
Integer.valueOf(flinkConfig.getString(BlobServerOptions.PORT)))
Review comment:
```suggestion
Integer.parseInt(flinkConfig.getString(BlobServerOptions.PORT)))
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
##########
@@ -73,6 +73,8 @@
private final DispatcherOperationCaches dispatcherOperationCaches;
+ private String restPort;
Review comment:
I believe we do not need the changes in class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]