wangyang0918 commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r508488975
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
##########
@@ -102,146 +95,73 @@
/** The ZooKeeper client to use. */
private final CuratorFramework client;
- /** The executor to run ZooKeeper callbacks on. */
- private final Executor executor;
-
- /** The runtime configuration. */
- private final Configuration configuration;
-
- /** The zookeeper based running jobs registry. */
- private final RunningJobsRegistry runningJobsRegistry;
-
- /** Store for arbitrary blobs. */
- private final BlobStoreService blobStoreService;
-
public ZooKeeperHaServices(
CuratorFramework client,
Executor executor,
Configuration configuration,
BlobStoreService blobStoreService) {
+ super(executor, configuration, blobStoreService);
this.client = checkNotNull(client);
- this.executor = checkNotNull(executor);
- this.configuration = checkNotNull(configuration);
- this.runningJobsRegistry = new
ZooKeeperRunningJobsRegistry(client, configuration);
-
- this.blobStoreService = checkNotNull(blobStoreService);
- }
-
- //
------------------------------------------------------------------------
- // Services
- //
------------------------------------------------------------------------
-
- @Override
- public LeaderRetrievalService getResourceManagerLeaderRetriever() {
- return ZooKeeperUtils.createLeaderRetrievalService(client,
configuration, RESOURCE_MANAGER_LEADER_PATH);
- }
-
- @Override
- public LeaderRetrievalService getDispatcherLeaderRetriever() {
- return ZooKeeperUtils.createLeaderRetrievalService(client,
configuration, DISPATCHER_LEADER_PATH);
}
@Override
- public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID)
{
- return ZooKeeperUtils.createLeaderRetrievalService(client,
configuration, getPathForJobManager(jobID));
+ public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
+ return new ZooKeeperCheckpointRecoveryFactory(client,
configuration, executor);
}
@Override
- public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID,
String defaultJobManagerAddress) {
- return getJobManagerLeaderRetriever(jobID);
+ public JobGraphStore getJobGraphStore() throws Exception {
+ return ZooKeeperUtils.createJobGraphs(client, configuration);
}
@Override
- public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
- return ZooKeeperUtils.createLeaderRetrievalService(client,
configuration, REST_SERVER_LEADER_PATH);
+ public RunningJobsRegistry getRunningJobsRegistry() {
+ return new ZooKeeperRunningJobsRegistry(client, configuration);
Review comment:
I will update this part and keep the old behavior.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]