Hi devs, I'd like to start a discussion thread on the topic how we provide retrieval services in non-high-availability scenario. To clarify terminology, non-high-availability scenario refers to StandaloneHaServices and EmbeddedHaServices.
***The problem*** We notice that retrieval services of current StandaloneHAServices (pre-configured) and EmbeddedHAServices(in-memory) has their respective problems. For pre-configured scenario, we now have a getJobManagerLeaderRetriever(JobID, defaultJMAddress) method to workaround the problem that it is impossible to configure JM address previously. The parameter defaultJMAddress is not in use in any other defaultJMAddress with any other high-availability mode. Also in MiniCluster scenario and anywhere else leader address pre-configure becomes impossible, StandaloneHAServices cannot be used. For in-memory case, it is clearly that it doesn't fit any distributed scenario. ***The proposal*** In order to address the inconsistency between pre-configured retrieval services and zookeeper based retrieval services, we reconsider the promises provided by "non-high-availability" and regard it as similar services as zookeeper based one except it doesn't tolerate node failure. Thus, we implement a service acts like a standalone zookeeper cluster, named LeaderServer. A leader server is an actor runs on jobmanager actor system and reacts to leader contender register and leader retriever request. If jobmanager fails, the leader server associated fails, too, where "non-high-availability" stands. In order to communicate with leader server, we start leader client per high-availability services(JM, TM, ClusterClient). When leader election service starts, it registers the contender to leader server via leader client(by akka communication); when leader retriever starts, it registers itself to leader server via leader client. Leader server handles leader election internally just like Embedded implementation, and notify retrievers with new leader information when there is new leader elected. In this way, we unify the view of retrieval services in all scenario: 1. Configure a name services to communicate with. In zookeeper mode it is zookeeper and in non-high-availability mode it is leader server. 2. Any retrieval request is sent to the name services and is handled by that services. Apart from a unified view, there are other advantages: + We need not to use a special method getJobManagerLeaderRetriever(JobID, defaultJMAddress), instead, use getJobManagerLeaderRetriever(JobID). And so that we need not include JobManager address in slot request which might become stale during transmission. + Separated configuration concerns on launch and retrieval. JobManager address & port, REST address & port is only configured when launch a cluster(even in YARN scenario, no need to configure). And when retrieval requested, configure the connect info to name services(zk or leader server). + Embedded implementation could be also included in this abstraction without any regression on multiple leader simulation for test purpose. Actually, leader server acts as a limited standalone zookeeper cluster. And thus, from where this proposal comes from, when we refactor metadata storage with transaction store proposed in FLINK-10333, we only take care of zookeeper implementation and a unified non-high-availability implementation. ***Clean up*** It is also noticed that there are several stale & unimplemented high-availability services implementations which I'd like to remove for a clean codebase work on this thread and FLINK-10333. They are: - YarnHighAvailabilityServices - AbstractYarnNonHaServices - YarnIntraNonHaMasterServices - YarnPreConfiguredMasterNonHaServices - SingleLeaderElectionService - FsNegativeRunningJobsRegistry Any feedback is appreciated. Best, tison.