[ 
https://issues.apache.org/jira/browse/FLINK-25235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17503628#comment-17503628
 ] 

Niklas Semmler edited comment on FLINK-25235 at 3/9/22, 4:56 PM:
-----------------------------------------------------------------

After staring at this problem for some time and playing through different 
solutions (that usually stopped at different tests), I want to document the 
concepts and issues at play here. 

*Background*
- We recently changed the implementation of the leader election from a separate 
{{LeaderElectionDriver}} per JobManager component (e.g., dispatcher, rest 
server, etc.) to a combined {{LeaderElectionDriver}} for the JobManager as a 
whole. (For Zoo Keeper based leader election we previously used a 
{{ZooKeeperLeaderElectionDriver}}. Now we use 
{{MultipleComponentLeaderElectionDriverAdapter}} which de-multiplexes the 
leader election for all JobManager components via a single 
{{MultipleComponentLeaderElectionService}} to a single 
{{ZooKeeperMultipleComponentLeaderElectionDriver}} underneath.) This changed 
the mapping between {{HighAvailabilityServices}} and {{LeaderElectionDriver}} 
from a one-to-many to a one-to-one relationship.
- {{HighAvailabilityServices}} is the class responsible for high availability 
(e.g., leader fail-over). However, even though JobManager and {{TaskManager}} 
have a dependecy on this class, not all scenarios require high availability. 
The implementation {{AbstractNonHaServices}} and its {{EmeddedHaServices}} 
(single JVM setup) and {{StandaloneHaServices}} (no support for JobManager 
failures) are used for these scenarios.
- {{MiniCluster}} is the class responsible for managing a single-node Flink 
cluster. It contains a single {{HighAvailabilityServices}} object that is 
shared by the JobManager and multiple {{TaskManager}}.
- {{TestingMiniCluster}} extends the {{MiniCluster}} for test purposes. Among 
others, it is used for tests on leader election between multiple JobManager. 
Currently, the  implementation re-uses the same {{HighAvailabilityServices}} 
object for multiple JobManager.

*Issues with MiniCluster*
- The {{MiniCluster}} is meant to "to execute Flink jobs locally". To me this 
means that the {{MiniCluster}} _should_ only use {{EmbeddedHaServices}} as it 
does not need high availability on a single JVM. However, it does not put any 
constraints on the type of high availability services used.
- Although the {{MiniCluster}} is production code. it contains code that is 
meant exclusively for testing. {{MiniCluster#getHaLeadershipControl}} is used 
to give a test explicit control over the leader election of an 
{{EmbeddedHaService}}. Btw, this method depends on {{HighAvailabilityServices}} 
being an {{EmbeddedHaServicesWithLeadershipControl}} object.

- The code to create the {{HighAvailabilityServices}} object seems needlessly 
complex. In {{MiniCluster#createHighAvailabilityServices}} we differentiate 
between {{EmbeddedWithControlHighAvailabilityServices}} and everything else. In 
{{HighAvailabilityServicesUtils}} we distinguish between 
{{EmbeddedHaServices}}, {{ZooKeeperHaServices}} 
&{{ZooKeeperMultipleComponentLeaderElectionHaServices}}, and ones that are 
produced by {{HighAvailabilityServicesFactory}}. It would be nice, if we could 
flatten this into one method. Especially with the additional option to 
configure it via the {{TestingMiniCluster}} (see below). (I also don't 
understand why there is no Kubernetes option. Even though there is a 
{{KubernetesHaServicesFactory}}.) 

*Issues with TestingMiniCluster*
- The name TestingSomething is usually used for mock objects. In contrast, the 
{{TestingMiniCluster}} does not mock anything.
- Its doc string says its used "to set a custom {@link 
HighAvailabilityServices}", but the {{MiniCluster}} already allows it.
- The use of the {{TestingMiniCluster}} seems to be to configure the number of 
{{JobManagers}}, configure the {{HighAvailabilityServices}} (which looks 
redundant see below), and to configure the {{TaskManager}} to use only local 
communication even when more than one {{TaskManager}} exists. Why can this not 
be optional settings on the MiniCluster itself?
- The {{TestingMiniCluster}} is used by tests that need multiple JobManager 
with separate leader election (e.g., ZooKeeperLeaderElectionITCase) and some 
that require that all parts including the {{TaskExecutor}} share the same 
{{HighAvailabilityServices}} (e.g., JobExecutionITCase).
- {{TestingMiniCluster}} allows overriding the method 
{{MiniCluster#createHighAvailabilityServices}} for creating a 
{{HighAvailabilityServices}}. However, that method already has a two step 
process of creating the {{HighAvailabilityServices}}. The existing process even 
includes the option of using a custom factory. Again, this redundancy makes the 
code hard to understand.

*Conclusion*
- The {{MiniCluster}} and {{TestingMiniCluster}} cover a large variety of 
different use cases that are related to what implementation of 
{{HighAvaibilityServices}} is used. This makes the classes hard to understand. 
Splitting these classes into one per use case would greatly improve the 
readability.
- The reason for using inheritance is not clear to me. First, the interaction 
between the subclass and parent class is hard to follow. Second, I don't see in 
what scenarios you would want to replace an instance of {{MiniCluster}} with 
{{TestingMiniCluster}}, so there should be no need to implement the same 
methods. (Could be wrong on this though.) Third, having a single non-abstract 
parent class with a single subclass on its own sounds like a degenerate 
inheritance tree.

*Refactoring options*
- We can strongly couple the {{MiniCluster}} with the type of 
{{HighlyAvailabilityServices}} and have one {{EmbeddedMiniCluster}} and one 
{{HighlyAvailableMiniCluster}}.
- We can split the generation of the {{HighlyAvailabilityServices}} object into 
a separate factory (and hopefully make the generation process less deep). With 
some additional settings, the MiniCluster could then take over the role of the 
{{TestingMiniCluster}}.


PS: To be exact, I should have used the term 
{{DispatcherResourceManagerComponent}} instead of JobManager, but this would've 
made the text even harder to read.


was (Author: JIRAUSER281719):
After staring at this problem for some time and playing through different 
solutions (that usually stopped at different tests), I want to document the 
concepts and issues at play here. 

*Background*
- We recently changed the implementation of the leader election from a separate 
{{LeaderElectionDriver}} per JobManager component (e.g., dispatcher, rest 
server, etc.) to a combined {{LeaderElectionDriver}} for the JobManager as a 
whole. (For Zoo Keeper based leader election we previously used a 
{{ZooKeeperLeaderElectionDriver}}. Now we use 
{{MultipleComponentLeaderElectionDriverAdapter}} which de-multiplexes the 
leader election for all JobManager components via a single 
{{MultipleComponentLeaderElectionService}} to a single 
{{ZooKeeperMultipleComponentLeaderElectionDriver}} underneath.) This changed 
the mapping between {{HighAvailabilityServices}} and {{LeaderElectionDriver}} 
from a one-to-many to a one-to-one relationship.
- {{HighAvailabilityServices}} is the class responsible for high availability 
(e.g., leader fail-over). However, even though JobManager and {{TaskManager}} 
have a dependecy on this class, not all scenarios require high availability. 
The implementation {{AbstractNonHaServices}} and its {{EmeddedHaServices}} 
(single JVM setup) and {{StandaloneHaServices}} (no support for JobManager 
failures) are used for these scenarios.
- {{MiniCluster}} is the class responsible for managing a single-node Flink 
cluster. It contains a single {{HighAvailabilityServices}} object that is 
shared by the JobManager and multiple {{TaskManager}}.
- {{TestingMiniCluster}} extends the {{MiniCluster}} for test purposes. Among 
others, it is used for tests on leader election between multiple JobManager. 
Currently, the  implementation re-uses the same {{HighAvailabilityServices}} 
object for multiple JobManager.

*Issues with MiniCluster*
- The {{MiniCluster}} is meant to "to execute Flink jobs locally". To me this 
means that the {{MiniCluster}} _should_ only use {{EmbeddedHaServices}} as it 
does not need high availability on a single JVM. However, it does not put any 
constraints on the type of high availability services used. Furthermore, the 
method {{MiniCluster#useLocalCommunication}} makes it sound, as if the 
MiniCluster can also be used for non-local communication. 
- Although the {{MiniCluster}} has the subclass {{TestingMiniCluster}} meant 
for testing, it does contain code that is meant for testing. E.g., 
{{MiniCluster#getHaLeadershipControl}} is used to give a test explicit control 
over the leader election of an {{EmbeddedHaService}}.
- Even though, the {{MiniCluster#createDispatcherResourceManagerComponents}} 
offers a tie-in for the creation of multiple JobManagers, {{MiniCluster}} does 
not offer the same freedom in stopping services. The 
{{MiniCluster#terminateMiniClusterServices}} assumes that only one instance of 
{{HighlyAvailableServices}} and other services exist. 

*Issues with TestingMiniCluster*
- The {{TestingMiniCluster}} is used by tests that need multiple JobManager 
with separate leader election (e.g., ZooKeeperLeaderElectionITCase) and some 
that require that all parts including the {{TaskExecutor}} share the same 
{{HighAvailabilityServices}} (e.g., JobExecutionITCase).
- {{TestingMiniCluster}} has two separate tie-ins into the creation of the 
JobManagers. First, it overrides the method used for the creation: 
{{MiniCluster#createDispatcherResourceManagerComponentFactory}}. Second, it 
allows overriding the factory used by the method: 
{{MiniCluster#createDispatcherResourceManagerComponentFactory}}. This 
redundancy makes the code hard to understand.
- {{TestingMiniCluster}} allows overriding the method 
{{MiniCluster#createHighAvailabilityServices}} for creating a 
{{HighAvailabilityServices}}. However, that method already has a two step 
process of creating the {{HighAvailabilityServices}}. The existing process even 
includes the option of using a custom factory. Again, this redundancy makes the 
code hard to understand.

*Conclusion*
- The {{MiniCluster}} and {{TestingMiniCluster}} cover a large variety of 
different use cases. This makes the classes hard to understand. Splitting these 
classes into one per use case would greatly improve the readability.
- The reason for using inheritance is not clear to me. First, the interaction 
between the subclass and parent class is hard to follow. Second, I don't see in 
what scenarios you would want to replace an instance of {{MiniCluster}} with 
{{TestingMiniCluster}}, so there should be no need to implement the same 
methods. (Could be wrong on this though.) Third, having a single non-abstract 
parent class with a single subclass on its own sounds like a degenerate 
inheritance tree.

*Refactoring options*
- We can split {{MiniCluster}} into a concrete and abstract part 
({{AbstractMiniCluster}}). Then, we could split the remaining {{MiniCluster}} 
and {{TestingMiniCluster}} into a number of subclasses for production & 
{{EmbeddedHaServices}}, testing & {{EmbeddedHaServices}}, testing & { 
{{EmbeddedHaServicesWithControl}}, and testing & not-embedded-HA.
- Alternatively, we can create a single {{MiniClusterBuilder}} responsible for 
configuration and a {{MiniClusterRunner}} for interaction with the cluster.


PS: To be exact, I should have used the term 
{{DispatcherResourceManagerComponent}} instead of JobManager, but this would've 
made the text even harder to read.

> Re-enable 
> ZooKeeperLeaderElectionITCase#testJobExecutionOnClusterWithLeaderChange
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-25235
>                 URL: https://issues.apache.org/jira/browse/FLINK-25235
>             Project: Flink
>          Issue Type: Technical Debt
>          Components: Runtime / Coordination
>            Reporter: David Morávek
>            Assignee: Niklas Semmler
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.15.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to