Thanks Semyon! I have a naming nitpick. Can we rename Problem to Failure, e.g. CommunicationProblemResolver to CommunicationFailureResolver?
D. On Tue, Jan 9, 2018 at 3:39 AM, Semyon Boikov <sboi...@apache.org> wrote: > Hi all, > > Currently I'm working on implementation of DiscoverySpi based on Apache > ZooKeeper (ZookeeperDiscoverySpi) and want to share results of this work. > > In very large clusters (>1000 nodes) current default implementation of > DiscoverySpi - TcpDiscoverySpi - has some significant drawbacks: > - TcpDiscoverySpi organizes nodes in ring, and all messages are passed > sequentially via ring. More nodes there are in ring, more time it takes to > pass message. In very large clusters such architecture can cause slowdown > of important operations (node join, node stop, cache start, etc). > - in TcpDiscoverySpi's protocol each node in ring is able to fail next one > in case of network issues, and it is possible that two nodes can 'kill' > each other (it is possible in complex scenarios when network is broken and > then restored back after some time, such problems were observed in real > environments), and with current TcpDiscoverySpi protocol there is no easy > way to completely fix such problems. > - when some node in ring fails, then previous node tries to restore ring > and sequentially tries to connect to next nodes. If large part of ring > fails then it takes long time to sequentially detect failure of all nodes. > - with TcpDiscoverySpi split brain is possible (one ring can split into two > independent parts), separate mechanism is needed to protect from split > brain when TcpDiscoverySpi is used > > Even though most probably some of these problems can be somehow fixed in > TcpDiscoverySpi, it seems more robust and fast DiscoverySpi can be > implemented on top of some existing coordination service. > > Apache ZooKeeper is known reliable service and it provides all mechanisms > and consistency guarantees required for Ignite's DiscoverySpi. Some > technical details of ZookeeperDiscoverySpi implementation can be found in > description prepared by Sergey Puchnin: > https://cwiki.apache.org/confluence/display/IGNITE/ > Discovery+SPI+by+ZooKeeper > . > > In our preliminary tests we were able to successfully start 4000+ nodes > with ZookeeperDiscoverySpi. New implementation works faster than > TcpDiscoverySpi and does not have mentioned TcpDiscoverySpi's drawbacks: > - nodes alive status is controlled by ZooKeeper, so nodes never can kill > each other > - ZooKeeper has protection from split brain > - with ZooKeeper it is possible to detect nodes join/failures in batches so > time to detect join/failure of 1 vs 100+ nodes is almost the same > > I'm going to finalize implementation of ZookeeperDiscoverySpi in next few > days. But in Ignite there is one more issue caused by the fact that > DiscoverySpi and CommunicationSpi are two independent component: it is > possible that DiscoverySpi considers some node as alive, but at the same > time CommunicationSpi is not able to send message to this node (this issue > exists not only for ZookeeperDiscoverySpi for but for TcpDiscoverySpi too). > Such case is very critical since all internal Ignite's code assumes that if > node is alive then CommunicationSpi is able to send/receive messages > to/from this node. If it is not the case, then any Ignite operation can > hang (with ZooKeeper such situation is possible when due to network > failures nodes have connection with ZooKeeper, but can not establish TCP > connection to each other). > > If such case arises, then Ignite should have some mechanism to forcibly > kill faulty nodes to keep cluster operational. But note that in case of > 'split brain' scenarios each independent part of cluster will consider > others as 'failed' and there should be some way to choose which part should > be killed. It would be good to provide generic solution for this problem as > part of work on new DiscoverySpi. > > We discussed this with Yakov Zhdanov and suggest following: in case when > communication fails to send message to some node and this node is > considered as alive, then Ignite should trigger global 'communication error > resolve' process (obviously, this process should use for messaging internal > discovery mechanisms). As part of this process CommunicationSpi on each > node should try to establish connection to all others alive nodes > (TcpCommunicationSpi can do this efficiently using async NIO) and send > results of this connection test to some coordinator node (e.g. oldest > cluster node). When coordinator receives results of connection test from > all nodes it calls user-defined CommunicationProblemResolver to choose > which nodes should be killed (CommunicationProblemResolver should be set in > IgniteConfiguration): > > public interface CommunicationProblemResolver { > public void resolve(CommunicationProblemContext ctx); > } > > CommunicationProblemResolver receives CommunicationProblemContext which > provides results of CommunicationSpi connection test. Also it can be useful > to have information about started caches and current cache data > distribution to decide which part of cluster should be killed: > > public interface CommunicationProblemContext { > /** > * @return Current topology snapshot. > */ > public List<ClusterNode> topologySnapshot(); > > /** > * @param node1 First node. > * @param node2 Second node. > * @return {@code True} if {@link CommunicationSpi} is able to > establish connection from first node to second node. > */ > public boolean connectionAvailable(ClusterNode node1, ClusterNode > node2); > > /** > * @return List of currently started cache. > */ > public List<String> startedCaches(); > > /** > * @param cacheName Cache name. > * @return Cache partitions affinity assignment. > */ > public List<List<ClusterNode>> cacheAffinity(String cacheName); > > /** > * @param cacheName Cache name. > * @return Cache partitions owners. > */ > public List<List<ClusterNode>> cachePartitionOwners(String cacheName); > > /** > * @param node Node to kill after communication error resolve. > */ > public void killNode(ClusterNode node); > } > > Default implementation of CommunicationProblemContext provided as part of > Ignite can keep alive largest sub-cluster where all nodes are able to > connect to each other. > > In addition to CommunicationProblemResolver we can fire new local > org.apache.ignite.events.Event when CommunicationSpi fails to send message > to alive node (can be useful for monitoring): > > class CommunicationProblemEvent extends EventAdapter { > ClusterNode eventNode(); > > Exception connectionError(); > } > > > Since this is pretty large change in public API I would be grateful if you > provide thoughts about CommunicationProblemResolver. > > Thank you, > Semyon >