Thanks Semyon!

I have a naming nitpick. Can we rename Problem to Failure, e.g.
CommunicationProblemResolver to CommunicationFailureResolver?

D.

On Tue, Jan 9, 2018 at 3:39 AM, Semyon Boikov <sboi...@apache.org> wrote:

> Hi all,
>
> Currently I'm working on implementation of DiscoverySpi based on Apache
> ZooKeeper (ZookeeperDiscoverySpi) and want to share results of this work.
>
> In very large clusters (>1000 nodes) current default implementation of
> DiscoverySpi - TcpDiscoverySpi - has some significant drawbacks:
> - TcpDiscoverySpi organizes nodes in ring, and all messages are passed
> sequentially via ring. More nodes there are in ring, more time it takes to
> pass message. In very large clusters such architecture can cause slowdown
> of important operations (node join, node stop, cache start, etc).
> - in TcpDiscoverySpi's protocol each node in ring is able to fail next one
> in case of network issues, and it is possible that two nodes can 'kill'
> each other (it is possible in complex scenarios when network is broken and
> then restored back after some time, such problems were observed in real
> environments), and with current TcpDiscoverySpi protocol there is no easy
> way to completely fix such problems.
> - when some node in ring fails, then previous node tries to restore ring
> and sequentially tries to connect to next nodes. If large part of ring
> fails then it takes long time to sequentially detect failure of all nodes.
> - with TcpDiscoverySpi split brain is possible (one ring can split into two
> independent parts), separate mechanism is needed to protect from split
> brain when TcpDiscoverySpi is used
>
> Even though most probably some of these problems can be somehow fixed in
> TcpDiscoverySpi, it seems more robust and fast DiscoverySpi can be
> implemented on top of some existing coordination service.
>
> Apache ZooKeeper is known reliable service and it provides all mechanisms
> and consistency guarantees required for Ignite's DiscoverySpi. Some
> technical details of ZookeeperDiscoverySpi implementation can be found in
> description prepared by Sergey Puchnin:
> https://cwiki.apache.org/confluence/display/IGNITE/
> Discovery+SPI+by+ZooKeeper
> .
>
> In our preliminary tests we were able to successfully start 4000+ nodes
> with ZookeeperDiscoverySpi. New implementation works faster than
> TcpDiscoverySpi and does not have mentioned TcpDiscoverySpi's drawbacks:
> - nodes alive status is controlled by ZooKeeper, so nodes never can kill
> each other
> - ZooKeeper has protection from split brain
> - with ZooKeeper it is possible to detect nodes join/failures in batches so
> time to detect join/failure of 1 vs 100+ nodes is almost the same
>
> I'm going to finalize implementation of ZookeeperDiscoverySpi in next few
> days. But in Ignite there is one more issue caused by the fact that
> DiscoverySpi and CommunicationSpi are two independent component: it is
> possible that DiscoverySpi considers some node as alive, but at the same
> time CommunicationSpi is not able to send message to this node (this issue
> exists not only for ZookeeperDiscoverySpi for but for TcpDiscoverySpi too).
> Such case is very critical since all internal Ignite's code assumes that if
> node is alive then CommunicationSpi is able to send/receive messages
> to/from this node. If it is not the case, then any Ignite operation can
> hang (with ZooKeeper such situation is possible when due to network
> failures nodes have connection with ZooKeeper, but can not establish TCP
> connection to each other).
>
> If such case arises, then Ignite should have some mechanism to forcibly
> kill faulty nodes to keep cluster operational. But note that in case of
> 'split brain' scenarios each independent part of cluster will consider
> others as 'failed' and there should be some way to choose which part should
> be killed. It would be good to provide generic solution for this problem as
> part of work on new DiscoverySpi.
>
> We discussed this with Yakov Zhdanov and suggest following: in case when
> communication fails to send message to some node and this node is
> considered as alive, then Ignite should trigger global 'communication error
> resolve' process (obviously, this process should use for messaging internal
> discovery mechanisms). As part of this process CommunicationSpi on each
> node should try to establish connection to all others alive nodes
> (TcpCommunicationSpi can do this efficiently using async NIO) and send
> results of this connection test to some coordinator node (e.g. oldest
> cluster node). When coordinator receives results of connection test from
> all nodes it calls user-defined CommunicationProblemResolver to choose
> which nodes should be killed (CommunicationProblemResolver should be set in
> IgniteConfiguration):
>
> public interface CommunicationProblemResolver {
>     public void resolve(CommunicationProblemContext ctx);
> }
>
> CommunicationProblemResolver  receives CommunicationProblemContext which
> provides results of CommunicationSpi connection test. Also it can be useful
> to have information about started caches and current cache data
> distribution to decide which part of cluster should be killed:
>
> public interface CommunicationProblemContext {
>     /**
>      * @return Current topology snapshot.
>      */
>     public List<ClusterNode> topologySnapshot();
>
>     /**
>      * @param node1 First node.
>      * @param node2 Second node.
>      * @return {@code True} if {@link CommunicationSpi} is able to
> establish connection from first node to second node.
>      */
>     public boolean connectionAvailable(ClusterNode node1, ClusterNode
> node2);
>
>     /**
>      * @return List of currently started cache.
>      */
>     public List<String> startedCaches();
>
>     /**
>      * @param cacheName Cache name.
>      * @return Cache partitions affinity assignment.
>      */
>     public List<List<ClusterNode>> cacheAffinity(String cacheName);
>
>     /**
>      * @param cacheName Cache name.
>      * @return Cache partitions owners.
>      */
>     public List<List<ClusterNode>> cachePartitionOwners(String cacheName);
>
>     /**
>      * @param node Node to kill after communication error resolve.
>      */
>     public void killNode(ClusterNode node);
> }
>
> Default implementation of CommunicationProblemContext provided as part of
> Ignite can keep alive largest sub-cluster where all nodes are able to
> connect to each other.
>
> In addition to CommunicationProblemResolver we can fire new local
> org.apache.ignite.events.Event when CommunicationSpi fails to send message
> to alive node (can be useful for monitoring):
>
> class CommunicationProblemEvent extends EventAdapter {
>      ClusterNode eventNode();
>
>      Exception connectionError();
> }
>
>
> Since this is pretty large change in public API I would be grateful if you
> provide thoughts about CommunicationProblemResolver.
>
> Thank you,
> Semyon
>

Reply via email to