Hi devs,

I'd like to start a discussion thread on the topic how we provide
retrieval services in non-high-availability scenario. To clarify
terminology, non-high-availability scenario refers to
StandaloneHaServices and EmbeddedHaServices.

***The problem***

We notice that retrieval services of current StandaloneHAServices
(pre-configured) and EmbeddedHAServices(in-memory) has their
respective problems.

For pre-configured scenario, we now have a
getJobManagerLeaderRetriever(JobID, defaultJMAddress) method
to workaround the problem that it is impossible to configure JM
address previously. The parameter defaultJMAddress is not in use in
any other defaultJMAddress with any other high-availability mode.
Also in MiniCluster scenario and anywhere else leader address
pre-configure becomes impossible, StandaloneHAServices cannot be used.

For in-memory case, it is clearly that it doesn't fit any distributed
scenario.

***The proposal***

In order to address the inconsistency between pre-configured retrieval
services and zookeeper based retrieval services, we reconsider the
promises provided by "non-high-availability" and regard it as
similar services as zookeeper based one except it doesn't tolerate
node failure. Thus, we implement a service acts like a standalone
zookeeper cluster, named LeaderServer.

A leader server is an actor runs on jobmanager actor system and reacts
to leader contender register and leader retriever request. If
jobmanager fails, the leader server associated fails, too, where
"non-high-availability" stands.

In order to communicate with leader server, we start leader client per
high-availability services(JM, TM, ClusterClient). When leader
election service starts, it registers the contender to leader server
via leader client(by akka communication); when leader retriever
starts, it registers itself to leader server via leader client.

Leader server handles leader election internally just like Embedded
implementation, and notify retrievers with new leader information
when there is new leader elected.

In this way, we unify the view of retrieval services in all scenario:

1. Configure a name services to communicate with. In zookeeper mode
it is zookeeper and in non-high-availability mode it is leader server.
2. Any retrieval request is sent to the name services and is handled
by that services.

Apart from a unified view, there are other advantages:

+ We need not to use a special method
getJobManagerLeaderRetriever(JobID, defaultJMAddress), instead, use
getJobManagerLeaderRetriever(JobID). And so that we need not include
JobManager address in slot request which might become stale during
transmission.

+ Separated configuration concerns on launch and retrieval. JobManager
address & port, REST address & port is only configured when launch
a cluster(even in YARN scenario, no need to configure). And when
retrieval requested, configure the connect info to name services(zk
or leader server).

+ Embedded implementation could be also included in this abstraction
without any regression on multiple leader simulation for test purpose.
Actually, leader server acts as a limited standalone zookeeper
cluster. And thus, from where this proposal comes from, when we
refactor metadata storage with transaction store proposed in
FLINK-10333, we only take care of zookeeper implementation and a
unified non-high-availability implementation.

***Clean up***

It is also noticed that there are several stale & unimplemented
high-availability services implementations which I'd like to remove for
a clean codebase work on this thread and FLINK-10333. They are:

- YarnHighAvailabilityServices
- AbstractYarnNonHaServices
- YarnIntraNonHaMasterServices
- YarnPreConfiguredMasterNonHaServices
- SingleLeaderElectionService
- FsNegativeRunningJobsRegistry

Any feedback is appreciated.

Best,
tison.

Reply via email to