tillrohrmann commented on a change in pull request #17053:
URL: https://github.com/apache/flink/pull/17053#discussion_r701377710
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
##########
@@ -213,7 +234,16 @@ private RestClusterClient(
this.waitStrategy = checkNotNull(waitStrategy);
this.clusterId = checkNotNull(clusterId);
- this.clientHAServices = checkNotNull(clientHAServices);
+ this.clientHAServices =
+ clientHAServicesFactory.create(
+ configuration,
+ new FatalErrorHandler() {
+ @Override
+ public void onFatalError(Throwable exception) {
+ webMonitorLeaderRetriever.handleError(
+ new FlinkException(exception));
+ }
Review comment:
```suggestion
exception ->
webMonitorLeaderRetriever.handleError(
new FlinkException(exception))
```
A bit less boilerplate.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
##########
@@ -110,10 +113,18 @@ public String getClusterDescription() {
// Flink client will always use Kubernetes service to contact
with jobmanager. So we
// have a pre-configured web monitor address. Using
StandaloneClientHAServices to
// create RestClusterClient is reasonable.
- return new RestClusterClient<>(
- configuration,
- clusterId,
- new
StandaloneClientHAServices(getWebMonitorAddress(configuration)));
+ String webMonitorAddress = getWebMonitorAddress(configuration);
Review comment:
I would move this into the factory because we don't know whether
`configuration` won't be changed.
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
##########
@@ -168,13 +170,24 @@
.isPresent();
public RestClusterClient(Configuration config, T clusterId) throws
Exception {
- this(config, clusterId,
HighAvailabilityServicesUtils.createClientHAService(config));
+ this(
+ config,
+ clusterId,
+ new ClientHighAvailabilityServicesFactory() {
+ @Override
+ public ClientHighAvailabilityServices create(
+ Configuration configuration, FatalErrorHandler
fatalErrorHandler)
+ throws Exception {
+ return
HighAvailabilityServicesUtils.createClientHAService(
+ configuration, fatalErrorHandler);
+ }
Review comment:
Let's create a `DefaultClientHighAvailabilityServicesFactory` that has a
singleton `INSTANCE` that does exactly this here. Then we don't have to use
anonymous classes.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
##########
@@ -110,10 +113,18 @@ public String getClusterDescription() {
// Flink client will always use Kubernetes service to contact
with jobmanager. So we
// have a pre-configured web monitor address. Using
StandaloneClientHAServices to
// create RestClusterClient is reasonable.
- return new RestClusterClient<>(
- configuration,
- clusterId,
- new
StandaloneClientHAServices(getWebMonitorAddress(configuration)));
+ String webMonitorAddress = getWebMonitorAddress(configuration);
+ ClientHighAvailabilityServicesFactory factory =
+ new ClientHighAvailabilityServicesFactory() {
+ @Override
+ public ClientHighAvailabilityServices create(
+ Configuration configuration,
+ FatalErrorHandler fatalErrorHandler)
+ throws Exception {
+ return new
StandaloneClientHAServices(webMonitorAddress);
+ }
+ };
+ return new RestClusterClient<>(configuration, clusterId,
factory);
Review comment:
Let us either use a lambda here or introduce a concrete class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]