xintongsong commented on a change in pull request #13755:
URL: https://github.com/apache/flink/pull/13755#discussion_r510580616
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##########
@@ -85,10 +86,20 @@
public KubernetesResourceManagerDriver(
Configuration flinkConfig,
- FlinkKubeClient kubeClient,
KubernetesResourceManagerDriverConfiguration
configuration) {
super(flinkConfig, GlobalConfiguration.loadConfiguration());
+ this.clusterId =
Preconditions.checkNotNull(configuration.getClusterId());
+ this.podCreationRetryInterval =
Preconditions.checkNotNull(configuration.getPodCreationRetryInterval());
+ this.kubeClient =
KubeClientFactory.fromConfiguration(flinkConfig, getIoExecutor());
Review comment:
I'm afraid this would not work.
`AbstractResourceManagerDriver#ioExecutor` is `null` until the driver is
initialized. The constructor is executed before the initialization, thus
`getIoExecutor` should throw `IllegalStateException`.
I think this also exposes the shortcoming for using different constructors
for production and testing. The `getIoExecutor` in the constructor is not
covered by any test.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/KubeClientFactory.java
##########
@@ -71,7 +74,8 @@ public static FlinkKubeClient fromConfiguration(Configuration
flinkConfig) {
final KubernetesClient client = new
DefaultKubernetesClient(config);
- return new Fabric8FlinkKubeClient(flinkConfig, client,
KubeClientFactory::createThreadPoolForAsyncIO);
+ return new Fabric8FlinkKubeClient(flinkConfig, client,
ioExecutor == null ?
+ KubeClientFactory::createThreadPoolForAsyncIO :
() -> ioExecutor);
Review comment:
Might be better to derive `ioExecutor` in
`fromConfiguration(Configuration flinkConfig)`.
Benefits are:
- Avoid passing null value arguments.
- Improves readability that `fromConfiguration(Configuration flinkConfig)`
itself shows which executor will be used when the user does not specify one.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
##########
@@ -69,7 +69,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
Review comment:
I'm not sure about replacing `Executor` with `ExecutorService` for all
these places.
Looking into `Fabric8FlinkKubeClient`, I think the only reason
`FlinkKubeClient` uses `ExecutorService` rather than `Executor` is that, it
will shutdown and release the thread pool in `close`. That is because the
client uses a dedicated thread pool, and if it does not shutdown the thread
pool, other components won't do that either.
Since now the client uses a shared IO executor, it should no longer shutdown
the executor on close. Otherwise, all the other components using that IO
executor would be affected. That means we can replace `ExecutorService` with
`Executor` for `FlinkKubeClient`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]