[ 
https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16923156#comment-16923156
 ] 

TisonKun commented on FLINK-10333:
----------------------------------

Hi [~till.rohrmann] [~xiaogang.shi] [~StephanEwen], I'd like to share some 
progress here. Mainly about our perspective of high-availability services and 
ideas on non-ha case which follows the proposed design of leader stores. I'm 
gonna update the document with details correspondingly but first put here for 
previewing overall direction.

As the content layout in the linked document, we regards high-availability 
services provides by FLINK as 3 parts: leader election services, name 
services(known as leader retrieval service) and metadata storage.

In order to contribute the whole rework under the topic of FLINK-10333, it 
makes sense that we split the initial transaction store implementation from 
metadata storage includes job graphs and checkpoints store that make use of it. 
Basically, the first pass includes reimplemented leader election services and 
name services based on new store layout, and also a leader store implementation 
but don't be used outside yet. 

We have internally finished the integration work and seems that the new 
implementation works well with current implementation of job graph store and 
checkpoint store which will later bases on leader store. In other words, it is 
possible that we apply transaction store in steps.

However, there are a bit of concerns when the actual integration happens, which 
is mainly about non-ha case and high-availability services dependencies.

(1) For non-ha case, we notice that current 
{{StandaloneHAServices}}(pre-configured) and {{EmbeddedHAServices}}(in-memory) 
has their respective problems. 

For pre-configured case, we now have a {{getJobManagerLeaderRetriever(JobID, 
defaultJMAddress)}} method to workaround the problem that it is impossible to 
configure JM Address previously. The parameter is not in use in any other case 
in any other high-availability mode. Also in MiniCluster case and anywhere else 
leader address pre-configure becomes impossible, {{StandaloneHAServices}} 
cannot be used. For in-memory case, it is clearly that it doesn't fit any 
distributed cases.

Internally, we introduce a {{LeaderServer}}/{{LeaderClient}} pair which acts 
like a simplified standalone zookeeper cluster to provide leader elections and 
name services. Briefly, we start a {{LeaderServer}} actor in JM groups with 
fixed name, and {{LeaderClient}} actors in JM groups, TMs, and cluster client 
who knows where {{LeaderServer}} is and register for notified new leader 
address. In this way, we share a unified view between non-ha and zookeeper 
based implementation where the difference is that LeaderServer runs on one 
point and doesn't tolerate failure. Also, both {{StandaloneHAServices}} and 
{{EmbeddedHAServices}} can be unified under this abstraction and thus we have 
one implementation for non-ha case.

(2) For high-availability dependencies, not only we find that, as described in 
FLINK-13750, high-availability services requirement in client *and in TM* is 
different from that in JM; but also in TM, create {{RpcServices}} depends on a 
working high-availability services that used to retrieved RM address which used 
to determine TM bind-address. This will conflict if we want to first start a 
LeaderClient actor in TM and construct high-availability services. We are 
thinking about configuring an address range that JM group runs on and eliminate 
the dependency from RpcServices to high-availability services(only for name 
services here).

(3) As the rework road, we'd like to directly replace the implementation with 
solid tests and verification. It is because the internal APIs are incompatible, 
and even we introduce a switch, either place two implementations in contenders 
and use them respectively or implement a new series of 
contenders(Dispatcher/JM/...) using new implementation(just like what we do in 
FLIP-6) seems cost unreasonably too much.(Besides, the first approach is error 
prone because we handle different implementations manually in one place for the 
same purpose.)

> Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, 
> CompletedCheckpoints)
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-10333
>                 URL: https://issues.apache.org/jira/browse/FLINK-10333
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.5.3, 1.6.0, 1.7.0
>            Reporter: Till Rohrmann
>            Priority: Major
>         Attachments: screenshot-1.png
>
>
> While going over the ZooKeeper based stores 
> ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, 
> {{ZooKeeperCompletedCheckpointStore}}) and the underlying 
> {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were 
> introduced with past incremental changes.
> * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} 
> or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization 
> problems will either lead to removing the Znode or not
> * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of 
> exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case 
> of a failure)
> * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be 
> better to move {{RetrievableStateStorageHelper}} out of it for a better 
> separation of concerns
> * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even 
> if it is locked. This should not happen since it could leave another system 
> in an inconsistent state (imagine a changed {{JobGraph}} which restores from 
> an old checkpoint)
> * Redundant but also somewhat inconsistent put logic in the different stores
> * Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} 
> which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}}
> * Getting rid of the {{SubmittedJobGraphListener}} would be helpful
> These problems made me think how reliable these components actually work. 
> Since these components are very important, I propose to refactor them.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to