wangyang0918 commented on a change in pull request #18119:
URL: https://github.com/apache/flink/pull/18119#discussion_r776621969
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##########
@@ -83,6 +87,12 @@
private FlinkPod taskManagerPodTemplate;
+ private final Integer restPort;
Review comment:
These variables could be local in `updateServicePort`.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesService.java
##########
@@ -26,4 +26,10 @@
public KubernetesService(Service internalResource) {
super(internalResource);
}
+
+ /** The flink service type. */
+ public enum ServiceType {
+ RestService,
+ InternalService,
Review comment:
```suggestion
REST_SERVICE,
INTERNAL_SERVICE,
```
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##########
@@ -240,6 +257,40 @@ private void recoverWorkerNodesFromPreviousAttempts()
throws ResourceManagerExce
getResourceEventHandler().onPreviousAttemptWorkersRecovered(recoveredWorkers);
}
+ /**
+ * Update all service target port.
+ *
+ * @throws Exception
+ */
+ private void updateServicePort() throws Exception {
+ if (!isHostNetwork) {
+ return;
+ }
+ log.debug("update {} value to {}", Constants.REST_PORT_NAME, restPort);
+
+ flinkKubeClient.updateServicePort(
+ KubernetesService.ServiceType.RestService,
+ clusterId,
+ Constants.REST_PORT_NAME,
+ restPort);
+ if (!isHA) {
Review comment:
```suggestion
if
(!HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
```
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##########
@@ -240,6 +257,40 @@ private void recoverWorkerNodesFromPreviousAttempts()
throws ResourceManagerExce
getResourceEventHandler().onPreviousAttemptWorkersRecovered(recoveredWorkers);
}
+ /**
+ * Update all service target port.
+ *
+ * @throws Exception
+ */
+ private void updateServicePort() throws Exception {
+ if (!isHostNetwork) {
+ return;
+ }
+ log.debug("update {} value to {}", Constants.REST_PORT_NAME, restPort);
+
+ flinkKubeClient.updateServicePort(
+ KubernetesService.ServiceType.RestService,
+ clusterId,
+ Constants.REST_PORT_NAME,
+ restPort);
+ if (!isHA) {
+ log.debug("update {} value to {}",
Constants.BLOB_SERVER_PORT_NAME, blobPort);
+ flinkKubeClient
+ .updateServicePort(
+ KubernetesService.ServiceType.InternalService,
+ clusterId,
+ Constants.BLOB_SERVER_PORT_NAME,
+ blobPort)
+ .get();
+ log.debug("update {} value to {}",
Constants.JOB_MANAGER_RPC_PORT_NAME, jmPort);
Review comment:
The log could be moved in to `updateServicePort`.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -373,6 +375,62 @@ public KubernetesPod loadPodFromTemplateFile(File file) {
return new KubernetesPod(this.internalClient.pods().load(file).get());
}
+ @Override
+ public CompletableFuture<Void> updateServicePort(
+ KubernetesService.ServiceType serviceType,
+ String clusterId,
+ String portName,
+ int targetPort) {
+ return CompletableFuture.runAsync(
+ () ->
+ getService(serviceType, clusterId)
+ .ifPresent(
+ service -> {
+ final Service updatedService =
+ new ServiceBuilder(
+
service.getInternalResource())
+ .editSpec()
+ .editMatchingPort(
+
servicePortBuilder ->
+
servicePortBuilder
+
.build()
+
.getName()
+
.equals(
+
portName))
+ .withTargetPort(
+ new
IntOrString(targetPort))
+ .endPort()
+ .endSpec()
+ .build();
+ this.internalClient
+ .services()
+ .withName(
+
getServiceName(serviceType, clusterId))
+ .replace(updatedService);
+ }),
+ kubeClientExecutorService);
+ }
+
+ /**
+ * Get real service name.
+ *
+ * @param serviceType serviceType
+ * @param clusterId clusterId
+ * @return serviceName
+ */
+ private String getServiceName(KubernetesService.ServiceType serviceType,
String clusterId) {
+ switch (serviceType) {
+ case RestService:
+ return
ExternalServiceDecorator.getExternalServiceName(clusterId);
+ case InternalService:
+ return
InternalServiceDecorator.getInternalServiceName(clusterId);
+ default:
+ throw new RuntimeException(
+ "Can not get ServiceName from
KubernetesService.ServiceType:"
+ + serviceType.name());
Review comment:
```suggestion
throw new IllegalArgumentException(
"Unrecognized service type: " + serviceType.name());
```
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -373,6 +375,62 @@ public KubernetesPod loadPodFromTemplateFile(File file) {
return new KubernetesPod(this.internalClient.pods().load(file).get());
}
+ @Override
+ public CompletableFuture<Void> updateServicePort(
+ KubernetesService.ServiceType serviceType,
+ String clusterId,
+ String portName,
+ int targetPort) {
+ return CompletableFuture.runAsync(
+ () ->
+ getService(serviceType, clusterId)
+ .ifPresent(
+ service -> {
+ final Service updatedService =
+ new ServiceBuilder(
+
service.getInternalResource())
+ .editSpec()
+ .editMatchingPort(
+
servicePortBuilder ->
+
servicePortBuilder
+
.build()
+
.getName()
+
.equals(
+
portName))
+ .withTargetPort(
+ new
IntOrString(targetPort))
+ .endPort()
+ .endSpec()
+ .build();
+ this.internalClient
+ .services()
+ .withName(
+
getServiceName(serviceType, clusterId))
+ .replace(updatedService);
+ }),
+ kubeClientExecutorService);
+ }
+
+ /**
+ * Get real service name.
+ *
+ * @param serviceType serviceType
+ * @param clusterId clusterId
+ * @return serviceName
Review comment:
```suggestion
* Get the Kubernetes service name.
*
* @param serviceType The service type
* @param clusterId The cluster id
* @return Return the Kubernetes service name if the service type is
known.
```
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesSessionClusterEntrypoint.java
##########
@@ -36,6 +36,8 @@ public KubernetesSessionClusterEntrypoint(Configuration
configuration) {
super(configuration);
}
+ private int blobPort;
Review comment:
Useless variable.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##########
@@ -240,6 +257,40 @@ private void recoverWorkerNodesFromPreviousAttempts()
throws ResourceManagerExce
getResourceEventHandler().onPreviousAttemptWorkersRecovered(recoveredWorkers);
}
+ /**
+ * Update all service target port.
+ *
+ * @throws Exception
+ */
+ private void updateServicePort() throws Exception {
Review comment:
```suggestion
private void updateKubernetesServiceTargetPortIfNecessary() throws
Exception {
```
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesEntrypointUtils.java
##########
@@ -56,13 +58,18 @@ static Configuration loadConfiguration(Configuration
dynamicParameters) {
GlobalConfiguration.loadConfiguration(configDir,
dynamicParameters);
if
(HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
- final String ipAddress =
System.getenv().get(Constants.ENV_FLINK_POD_IP_ADDRESS);
- Preconditions.checkState(
- ipAddress != null,
- "JobManager ip address environment variable %s not set",
- Constants.ENV_FLINK_POD_IP_ADDRESS);
- configuration.setString(JobManagerOptions.ADDRESS, ipAddress);
- configuration.setString(RestOptions.ADDRESS, ipAddress);
+ if (KubernetesUtils.isHostNetwork(configuration)) {
Review comment:
We also need to override the `RestOptions.BIND_PORT`,
`BlobServerOptions.PORT`.
##########
File path:
docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
##########
@@ -26,6 +26,12 @@
<td>String</td>
<td>The cluster-id, which should be no more than 45 characters, is
used for identifying a unique Flink cluster. The id must only contain lowercase
alphanumeric characters and "-". The required format is <code
class="highlighter-rouge">[a-z]([-a-z0-9]*[a-z0-9])</code>. If not set, the
client will automatically generate it with a random ID.</td>
</tr>
+ <tr>
Review comment:
The doc should be generated again.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
##########
@@ -113,7 +156,7 @@ public static void checkAndUpdatePortConfigOption(
* @param port port config option
* @return valid port
*/
- public static Integer parsePort(Configuration flinkConfig,
ConfigOption<String> port) {
+ public static Integer parseFixedPort(Configuration flinkConfig,
ConfigOption<String> port) {
Review comment:
Why do we have to do the renaming?
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java
##########
@@ -59,6 +59,10 @@
KubernetesConfigOptions.ImagePullPolicy.IfNotPresent;
protected static final int JOB_MANAGER_MEMORY = 768;
+ protected static final int DYNAMIC_REST_PORT = 9031;
Review comment:
Useless.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -213,17 +217,15 @@ public void stopAndCleanupCluster(String clusterId) {
}
@Override
- public Optional<KubernetesService> getRestService(String clusterId) {
- final String serviceName =
ExternalServiceDecorator.getExternalServiceName(clusterId);
-
+ public Optional<KubernetesService> getService(
+ KubernetesService.ServiceType serviceType, String clusterId) {
+ String serviceName = getServiceName(serviceType, clusterId);
Review comment:
`serviceName` could be `final`.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
##########
@@ -201,6 +203,20 @@ KubernetesLeaderElector createLeaderElector(
*/
KubernetesPod loadPodFromTemplateFile(File podTemplateFile);
+ /**
+ * Update service to real targetPort value.
Review comment:
```suggestion
* Update the target ports of the given Kubernetes service
```
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java
##########
@@ -177,6 +187,49 @@ protected Service buildExternalServiceWithClusterIP() {
KubernetesConfigOptions.ServiceExposedType.ClusterIP,
servicePort, null);
}
+ protected Service buildEditedExternalServiceWithClusterIP() {
Review comment:
Useless.
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java
##########
@@ -92,6 +95,13 @@ protected void mockExpectedServiceFromServerSide(Service
expectedService) {
server.expect().get().withPath(path).andReturn(200,
expectedService).always();
}
+ protected void mockEditedServiceFromServerSide(Service expectedService) {
Review comment:
Useless.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
##########
@@ -86,6 +88,9 @@
this.dispatcherLeaderRetrievalService =
dispatcherLeaderRetrievalService;
this.resourceManagerRetrievalService = resourceManagerRetrievalService;
this.webMonitorEndpoint = webMonitorEndpoint;
+ if (webMonitorEndpoint instanceof WebMonitorEndpoint) {
Review comment:
TBH, This change is not very good :)
I think we could pass the `webInterfaceUrl` to
`KubernetesResourceManagerDriver` and then parse the rest port. You could find
the similar implementation in the
`YarnResourceManager#registerApplicationMaster`
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
##########
@@ -76,8 +78,6 @@
/** Tests for Fabric implementation of {@link FlinkKubeClient}. */
public class Fabric8FlinkKubeClientTest extends KubernetesClientTestBase {
- private static final int RPC_PORT = 7123;
Review comment:
After removing the useless methods in `KubernetesClientTestBase`, we do
not need to move these two variables there.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
##########
@@ -209,7 +209,6 @@ public BlobServer(Configuration config, BlobStore
blobStore) throws IOException
// start the server thread
setName("BLOB Server listener at " + getPort());
setDaemon(true);
-
Review comment:
Unnecessary change.
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
##########
@@ -208,6 +210,15 @@ public KubernetesPod loadPodFromTemplateFile(File file) {
throw new UnsupportedOperationException();
}
+ @Override
+ public CompletableFuture<Void> updateServicePort(
+ KubernetesService.ServiceType serviceType,
+ String clusterId,
+ String portName,
+ int targetPort) {
+ return null;
Review comment:
```suggestion
throw new UnsupportedOperationException();
```
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
##########
@@ -172,6 +172,74 @@ public void testCreateFlinkMasterComponent() throws
Exception {
});
}
+ @Test
+ public void testUpdateServicePort() throws Exception {
+
this.flinkKubeClient.createJobManagerComponent(this.kubernetesJobManagerSpecification);
+ flinkKubeClient
+ .updateServicePort(
+ KubernetesService.ServiceType.RestService,
+ CLUSTER_ID,
+ Constants.REST_PORT_NAME,
+ 9081)
+ .get();
+ assertTrue(
+ flinkKubeClient
+
.getService(KubernetesService.ServiceType.RestService, CLUSTER_ID)
+ .get()
+ .getInternalResource()
+ .getSpec()
+ .getPorts()
+ .get(0)
+ .getTargetPort()
+ .getIntVal()
+ == 9081);
+
+ flinkKubeClient
+ .updateServicePort(
+ KubernetesService.ServiceType.InternalService,
+ CLUSTER_ID,
+ Constants.BLOB_SERVER_PORT_NAME,
+ 9082)
+ .get();
+ assertTrue(
+ flinkKubeClient
+ .getService(
+
KubernetesService.ServiceType.InternalService, CLUSTER_ID)
+
.get().getInternalResource().getSpec().getPorts().stream()
+ .filter(
+ port ->
+ port.getName()
+
.equals(Constants.BLOB_SERVER_PORT_NAME))
+ .findFirst()
+ .get()
+ .getTargetPort()
+ .getIntVal()
+ == 9082);
+ flinkKubeClient
+ .updateServicePort(
+ KubernetesService.ServiceType.InternalService,
+ CLUSTER_ID,
+ Constants.JOB_MANAGER_RPC_PORT_NAME,
+ 9083)
+ .get();
+ assertTrue(
+ flinkKubeClient
+ .getService(
+
KubernetesService.ServiceType.InternalService, CLUSTER_ID)
+
.get().getInternalResource().getSpec().getPorts().stream()
+ .filter(
+ port ->
+ port.getName()
+ .equals(
+ Constants
+
.JOB_MANAGER_RPC_PORT_NAME))
+ .findFirst()
+ .get()
+ .getTargetPort()
+ .getIntVal()
+ == 9083);
+ }
+
Review comment:
```
@Test
public void testUpdateInternalServicePort() throws Exception {
this.flinkKubeClient.createJobManagerComponent(this.kubernetesJobManagerSpecification);
final int expectedBlobPort = 9082;
flinkKubeClient
.updateServicePort(
KubernetesService.ServiceType.InternalService,
CLUSTER_ID,
Constants.BLOB_SERVER_PORT_NAME,
expectedBlobPort)
.get();
final int updatedBlobPort =
getServiceTargetPort(CLUSTER_ID,
Constants.BLOB_SERVER_PORT_NAME);
assertThat(updatedBlobPort, is(expectedBlobPort));
}
@Test
public void testUpdateRestServicePort() throws Exception {
this.flinkKubeClient.createJobManagerComponent(this.kubernetesJobManagerSpecification);
final int expectedRestPort = 9081;
flinkKubeClient
.updateServicePort(
KubernetesService.ServiceType.RestService,
CLUSTER_ID,
Constants.REST_PORT_NAME,
expectedRestPort)
.get();
final int updatedRestPort =
getServiceTargetPort(
CLUSTER_ID + Constants.FLINK_REST_SERVICE_SUFFIX,
Constants.REST_PORT_NAME);
assertThat(updatedRestPort, is(expectedRestPort));
}
private int getServiceTargetPort(String serviceName, String portName) {
final List<Integer> ports =
kubeClient.services().withName(serviceName).get().getSpec().getPorts().stream()
.filter(servicePort ->
servicePort.getName().equalsIgnoreCase(portName))
.map(servicePort ->
servicePort.getTargetPort().getIntVal())
.collect(Collectors.toList());
assertThat(ports.size(), is(1));
return ports.get(0);
}
```
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
##########
@@ -132,9 +133,11 @@ private Pod decoratePod(Pod pod) {
for (File file : localLogFiles) {
data.put(file.getName(), Files.toString(file,
StandardCharsets.UTF_8));
}
-
- final Map<String, String> propertiesMap =
-
getClusterSidePropertiesMap(kubernetesComponentConf.getFlinkConfiguration());
+ Configuration configuration =
kubernetesComponentConf.getFlinkConfiguration();
Review comment:
We could override the `BlobServerOptions.PORT` and
`RestOptions.BIND_PORT` in `KubernetesEntrypointUtils#loadConfiguration`.
For `TaskManagerOptions.RPC_PORT`, we do not need to do the
`checkAndUpdatePortConfigOption` in `KubernetesClusterDescriptor`.
After then, we could remove `KubernetesUtils.updateDynamicPorts`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]