Andrey, Vladimir,

Zookeeper does provide required ordering guarantees, but Zookeeper watcher
API really does not provide functionality required for DiscoverySpi out of
the box. To detect nodes join/failure we need to watch for
creation/deletion of znodes, but in case if some client disconnects from
Zookeeper and reconnects after some time, then it can miss some change
events and after reconnect it will get only current state. For example:
there are 2 znodes (related to Ignite cluster nodes): A, B. One of
zookeeper clients disconnects from Zookeeper and tries to reconnect, at
this moment new cluster node joins and creates znode C, then it immediately
fails and znode C is removed. When disconnected client restores connection
it still sees znodes A and B and is not aware about node C. This means that
different clients can see different Zookeeper events, to overcome this
issue ZookeeperDiscoverySpi has single coordinator node which listens for
Zookeeper notifications and transforms then into DiscoverySpi events
(scenario when znode C is created and removed while coordinator is
disconnected is still possible, but it is not an issue, this means cluster
node C failed before it finished join). With such approach with single
coordinator I don't see scenario when different nodes can see different
events or some event can be missed.

Semyon

On Wed, Jan 10, 2018 at 10:38 AM, Vladimir Ozerov <voze...@gridgain.com>
wrote:

> Hi Andrey,
>
> Could you please share detail of this API mismatch? AFAIK, the main
> guarantee we need for disco SPI is total message ordering. Zookeeper
> provides this guarantee. Moreover, Zookeeper is proven to be correct and
> reliable coordinator service by many users and Jepsen tests, as opposed to
> various in-house implementations (e.g. Zen of Elasticsearch).
>
> вт, 9 янв. 2018 г. в 21:53, Andrey Kornev <andrewkor...@hotmail.com>:
>
>> Semyon,
>>
>> Not to discourage you or anything, just a interesting fact from recent
>> history.
>>
>> I vaguely remember already trying to implement DiscoverySpi on top of
>> Zookeeper back in 2012. After a few failed attempts and a lot of help from
>> Zookeeper's original developers (Flavio Junqueira and Ben Reed) we (Dmitriy
>> S. and I) concluded that its not possible to  implement DiscoverySpi on top
>> of Zookeeper due to strict(er) semantics of DiscoverySpi. Unfortunately I
>> do not remember details, but essentially, in some cases it was not possible
>> to establish total ordering of watcher events and under certain
>> circumstances loss of such events was possible.
>>
>> It's not to say that Zookeeper can't be used to implement the cluster
>> membership tracking in general. The problem is rather with DiscoverySpi
>> semantics that require a different set of APIs than what Zookeeper provides.
>>
>> Regards
>> Andrey
>>
>> ________________________________
>> From: Semyon Boikov <sboi...@apache.org>
>> Sent: Tuesday, January 9, 2018 3:39 AM
>> To: dev@ignite.apache.org
>> Subject: DiscoverySpi based on Apache ZooKeeper
>>
>> Hi all,
>>
>> Currently I'm working on implementation of DiscoverySpi based on Apache
>> ZooKeeper (ZookeeperDiscoverySpi) and want to share results of this work.
>>
>> In very large clusters (>1000 nodes) current default implementation of
>> DiscoverySpi - TcpDiscoverySpi - has some significant drawbacks:
>> - TcpDiscoverySpi organizes nodes in ring, and all messages are passed
>> sequentially via ring. More nodes there are in ring, more time it takes to
>> pass message. In very large clusters such architecture can cause slowdown
>> of important operations (node join, node stop, cache start, etc).
>> - in TcpDiscoverySpi's protocol each node in ring is able to fail next one
>> in case of network issues, and it is possible that two nodes can 'kill'
>> each other (it is possible in complex scenarios when network is broken and
>> then restored back after some time, such problems were observed in real
>> environments), and with current TcpDiscoverySpi protocol there is no easy
>> way to completely fix such problems.
>> - when some node in ring fails, then previous node tries to restore ring
>> and sequentially tries to connect to next nodes. If large part of ring
>> fails then it takes long time to sequentially detect failure of all nodes.
>> - with TcpDiscoverySpi split brain is possible (one ring can split into
>> two
>> independent parts), separate mechanism is needed to protect from split
>> brain when TcpDiscoverySpi is used
>>
>> Even though most probably some of these problems can be somehow fixed in
>> TcpDiscoverySpi, it seems more robust and fast DiscoverySpi can be
>> implemented on top of some existing coordination service.
>>
>> Apache ZooKeeper is known reliable service and it provides all mechanisms
>> and consistency guarantees required for Ignite's DiscoverySpi. Some
>> technical details of ZookeeperDiscoverySpi implementation can be found in
>> description prepared by Sergey Puchnin:
>> https://cwiki.apache.org/confluence/display/IGNITE/
>> Discovery+SPI+by+ZooKeeper
>> .
>>
>> In our preliminary tests we were able to successfully start 4000+ nodes
>> with ZookeeperDiscoverySpi. New implementation works faster than
>> TcpDiscoverySpi and does not have mentioned TcpDiscoverySpi's drawbacks:
>> - nodes alive status is controlled by ZooKeeper, so nodes never can kill
>> each other
>> - ZooKeeper has protection from split brain
>> - with ZooKeeper it is possible to detect nodes join/failures in batches
>> so
>> time to detect join/failure of 1 vs 100+ nodes is almost the same
>>
>> I'm going to finalize implementation of ZookeeperDiscoverySpi in next few
>> days. But in Ignite there is one more issue caused by the fact that
>> DiscoverySpi and CommunicationSpi are two independent component: it is
>> possible that DiscoverySpi considers some node as alive, but at the same
>> time CommunicationSpi is not able to send message to this node (this issue
>> exists not only for ZookeeperDiscoverySpi for but for TcpDiscoverySpi
>> too).
>> Such case is very critical since all internal Ignite's code assumes that
>> if
>> node is alive then CommunicationSpi is able to send/receive messages
>> to/from this node. If it is not the case, then any Ignite operation can
>> hang (with ZooKeeper such situation is possible when due to network
>> failures nodes have connection with ZooKeeper, but can not establish TCP
>> connection to each other).
>>
>> If such case arises, then Ignite should have some mechanism to forcibly
>> kill faulty nodes to keep cluster operational. But note that in case of
>> 'split brain' scenarios each independent part of cluster will consider
>> others as 'failed' and there should be some way to choose which part
>> should
>> be killed. It would be good to provide generic solution for this problem
>> as
>> part of work on new DiscoverySpi.
>>
>> We discussed this with Yakov Zhdanov and suggest following: in case when
>> communication fails to send message to some node and this node is
>> considered as alive, then Ignite should trigger global 'communication
>> error
>> resolve' process (obviously, this process should use for messaging
>> internal
>> discovery mechanisms). As part of this process CommunicationSpi on each
>> node should try to establish connection to all others alive nodes
>> (TcpCommunicationSpi can do this efficiently using async NIO) and send
>> results of this connection test to some coordinator node (e.g. oldest
>> cluster node). When coordinator receives results of connection test from
>> all nodes it calls user-defined CommunicationProblemResolver to choose
>> which nodes should be killed (CommunicationProblemResolver should be set
>> in
>> IgniteConfiguration):
>>
>> public interface CommunicationProblemResolver {
>>     public void resolve(CommunicationProblemContext ctx);
>> }
>>
>> CommunicationProblemResolver  receives CommunicationProblemContext which
>> provides results of CommunicationSpi connection test. Also it can be
>> useful
>> to have information about started caches and current cache data
>> distribution to decide which part of cluster should be killed:
>>
>> public interface CommunicationProblemContext {
>>     /**
>>      * @return Current topology snapshot.
>>      */
>>     public List<ClusterNode> topologySnapshot();
>>
>>     /**
>>      * @param node1 First node.
>>      * @param node2 Second node.
>>      * @return {@code True} if {@link CommunicationSpi} is able to
>> establish connection from first node to second node.
>>      */
>>     public boolean connectionAvailable(ClusterNode node1, ClusterNode
>> node2);
>>
>>     /**
>>      * @return List of currently started cache.
>>      */
>>     public List<String> startedCaches();
>>
>>     /**
>>      * @param cacheName Cache name.
>>      * @return Cache partitions affinity assignment.
>>      */
>>     public List<List<ClusterNode>> cacheAffinity(String cacheName);
>>
>>     /**
>>      * @param cacheName Cache name.
>>      * @return Cache partitions owners.
>>      */
>>     public List<List<ClusterNode>> cachePartitionOwners(String cacheName);
>>
>>     /**
>>      * @param node Node to kill after communication error resolve.
>>      */
>>     public void killNode(ClusterNode node);
>> }
>>
>> Default implementation of CommunicationProblemContext provided as part of
>> Ignite can keep alive largest sub-cluster where all nodes are able to
>> connect to each other.
>>
>> In addition to CommunicationProblemResolver we can fire new local
>> org.apache.ignite.events.Event when CommunicationSpi fails to send message
>> to alive node (can be useful for monitoring):
>>
>> class CommunicationProblemEvent extends EventAdapter {
>>      ClusterNode eventNode();
>>
>>      Exception connectionError();
>> }
>>
>>
>> Since this is pretty large change in public API I would be grateful if you
>> provide thoughts about CommunicationProblemResolver.
>>
>> Thank you,
>> Semyon
>>
>

Reply via email to