Dmitriy, no problem, I'll rename it.

Semyon

On Tue, Jan 9, 2018 at 11:20 PM, Dmitriy Setrakyan <[email protected]>
wrote:

> Thanks Semyon!
>
> I have a naming nitpick. Can we rename Problem to Failure, e.g.
> CommunicationProblemResolver to CommunicationFailureResolver?
>
> D.
>
> On Tue, Jan 9, 2018 at 3:39 AM, Semyon Boikov <[email protected]> wrote:
>
> > Hi all,
> >
> > Currently I'm working on implementation of DiscoverySpi based on Apache
> > ZooKeeper (ZookeeperDiscoverySpi) and want to share results of this work.
> >
> > In very large clusters (>1000 nodes) current default implementation of
> > DiscoverySpi - TcpDiscoverySpi - has some significant drawbacks:
> > - TcpDiscoverySpi organizes nodes in ring, and all messages are passed
> > sequentially via ring. More nodes there are in ring, more time it takes
> to
> > pass message. In very large clusters such architecture can cause slowdown
> > of important operations (node join, node stop, cache start, etc).
> > - in TcpDiscoverySpi's protocol each node in ring is able to fail next
> one
> > in case of network issues, and it is possible that two nodes can 'kill'
> > each other (it is possible in complex scenarios when network is broken
> and
> > then restored back after some time, such problems were observed in real
> > environments), and with current TcpDiscoverySpi protocol there is no easy
> > way to completely fix such problems.
> > - when some node in ring fails, then previous node tries to restore ring
> > and sequentially tries to connect to next nodes. If large part of ring
> > fails then it takes long time to sequentially detect failure of all
> nodes.
> > - with TcpDiscoverySpi split brain is possible (one ring can split into
> two
> > independent parts), separate mechanism is needed to protect from split
> > brain when TcpDiscoverySpi is used
> >
> > Even though most probably some of these problems can be somehow fixed in
> > TcpDiscoverySpi, it seems more robust and fast DiscoverySpi can be
> > implemented on top of some existing coordination service.
> >
> > Apache ZooKeeper is known reliable service and it provides all mechanisms
> > and consistency guarantees required for Ignite's DiscoverySpi. Some
> > technical details of ZookeeperDiscoverySpi implementation can be found in
> > description prepared by Sergey Puchnin:
> > https://cwiki.apache.org/confluence/display/IGNITE/
> > Discovery+SPI+by+ZooKeeper
> > .
> >
> > In our preliminary tests we were able to successfully start 4000+ nodes
> > with ZookeeperDiscoverySpi. New implementation works faster than
> > TcpDiscoverySpi and does not have mentioned TcpDiscoverySpi's drawbacks:
> > - nodes alive status is controlled by ZooKeeper, so nodes never can kill
> > each other
> > - ZooKeeper has protection from split brain
> > - with ZooKeeper it is possible to detect nodes join/failures in batches
> so
> > time to detect join/failure of 1 vs 100+ nodes is almost the same
> >
> > I'm going to finalize implementation of ZookeeperDiscoverySpi in next few
> > days. But in Ignite there is one more issue caused by the fact that
> > DiscoverySpi and CommunicationSpi are two independent component: it is
> > possible that DiscoverySpi considers some node as alive, but at the same
> > time CommunicationSpi is not able to send message to this node (this
> issue
> > exists not only for ZookeeperDiscoverySpi for but for TcpDiscoverySpi
> too).
> > Such case is very critical since all internal Ignite's code assumes that
> if
> > node is alive then CommunicationSpi is able to send/receive messages
> > to/from this node. If it is not the case, then any Ignite operation can
> > hang (with ZooKeeper such situation is possible when due to network
> > failures nodes have connection with ZooKeeper, but can not establish TCP
> > connection to each other).
> >
> > If such case arises, then Ignite should have some mechanism to forcibly
> > kill faulty nodes to keep cluster operational. But note that in case of
> > 'split brain' scenarios each independent part of cluster will consider
> > others as 'failed' and there should be some way to choose which part
> should
> > be killed. It would be good to provide generic solution for this problem
> as
> > part of work on new DiscoverySpi.
> >
> > We discussed this with Yakov Zhdanov and suggest following: in case when
> > communication fails to send message to some node and this node is
> > considered as alive, then Ignite should trigger global 'communication
> error
> > resolve' process (obviously, this process should use for messaging
> internal
> > discovery mechanisms). As part of this process CommunicationSpi on each
> > node should try to establish connection to all others alive nodes
> > (TcpCommunicationSpi can do this efficiently using async NIO) and send
> > results of this connection test to some coordinator node (e.g. oldest
> > cluster node). When coordinator receives results of connection test from
> > all nodes it calls user-defined CommunicationProblemResolver to choose
> > which nodes should be killed (CommunicationProblemResolver should be set
> in
> > IgniteConfiguration):
> >
> > public interface CommunicationProblemResolver {
> >     public void resolve(CommunicationProblemContext ctx);
> > }
> >
> > CommunicationProblemResolver  receives CommunicationProblemContext which
> > provides results of CommunicationSpi connection test. Also it can be
> useful
> > to have information about started caches and current cache data
> > distribution to decide which part of cluster should be killed:
> >
> > public interface CommunicationProblemContext {
> >     /**
> >      * @return Current topology snapshot.
> >      */
> >     public List<ClusterNode> topologySnapshot();
> >
> >     /**
> >      * @param node1 First node.
> >      * @param node2 Second node.
> >      * @return {@code True} if {@link CommunicationSpi} is able to
> > establish connection from first node to second node.
> >      */
> >     public boolean connectionAvailable(ClusterNode node1, ClusterNode
> > node2);
> >
> >     /**
> >      * @return List of currently started cache.
> >      */
> >     public List<String> startedCaches();
> >
> >     /**
> >      * @param cacheName Cache name.
> >      * @return Cache partitions affinity assignment.
> >      */
> >     public List<List<ClusterNode>> cacheAffinity(String cacheName);
> >
> >     /**
> >      * @param cacheName Cache name.
> >      * @return Cache partitions owners.
> >      */
> >     public List<List<ClusterNode>> cachePartitionOwners(String
> cacheName);
> >
> >     /**
> >      * @param node Node to kill after communication error resolve.
> >      */
> >     public void killNode(ClusterNode node);
> > }
> >
> > Default implementation of CommunicationProblemContext provided as part of
> > Ignite can keep alive largest sub-cluster where all nodes are able to
> > connect to each other.
> >
> > In addition to CommunicationProblemResolver we can fire new local
> > org.apache.ignite.events.Event when CommunicationSpi fails to send
> message
> > to alive node (can be useful for monitoring):
> >
> > class CommunicationProblemEvent extends EventAdapter {
> >      ClusterNode eventNode();
> >
> >      Exception connectionError();
> > }
> >
> >
> > Since this is pretty large change in public API I would be grateful if
> you
> > provide thoughts about CommunicationProblemResolver.
> >
> > Thank you,
> > Semyon
> >
>

Reply via email to