Dmitriy, no problem, I'll rename it. Semyon
On Tue, Jan 9, 2018 at 11:20 PM, Dmitriy Setrakyan <[email protected]> wrote: > Thanks Semyon! > > I have a naming nitpick. Can we rename Problem to Failure, e.g. > CommunicationProblemResolver to CommunicationFailureResolver? > > D. > > On Tue, Jan 9, 2018 at 3:39 AM, Semyon Boikov <[email protected]> wrote: > > > Hi all, > > > > Currently I'm working on implementation of DiscoverySpi based on Apache > > ZooKeeper (ZookeeperDiscoverySpi) and want to share results of this work. > > > > In very large clusters (>1000 nodes) current default implementation of > > DiscoverySpi - TcpDiscoverySpi - has some significant drawbacks: > > - TcpDiscoverySpi organizes nodes in ring, and all messages are passed > > sequentially via ring. More nodes there are in ring, more time it takes > to > > pass message. In very large clusters such architecture can cause slowdown > > of important operations (node join, node stop, cache start, etc). > > - in TcpDiscoverySpi's protocol each node in ring is able to fail next > one > > in case of network issues, and it is possible that two nodes can 'kill' > > each other (it is possible in complex scenarios when network is broken > and > > then restored back after some time, such problems were observed in real > > environments), and with current TcpDiscoverySpi protocol there is no easy > > way to completely fix such problems. > > - when some node in ring fails, then previous node tries to restore ring > > and sequentially tries to connect to next nodes. If large part of ring > > fails then it takes long time to sequentially detect failure of all > nodes. > > - with TcpDiscoverySpi split brain is possible (one ring can split into > two > > independent parts), separate mechanism is needed to protect from split > > brain when TcpDiscoverySpi is used > > > > Even though most probably some of these problems can be somehow fixed in > > TcpDiscoverySpi, it seems more robust and fast DiscoverySpi can be > > implemented on top of some existing coordination service. > > > > Apache ZooKeeper is known reliable service and it provides all mechanisms > > and consistency guarantees required for Ignite's DiscoverySpi. Some > > technical details of ZookeeperDiscoverySpi implementation can be found in > > description prepared by Sergey Puchnin: > > https://cwiki.apache.org/confluence/display/IGNITE/ > > Discovery+SPI+by+ZooKeeper > > . > > > > In our preliminary tests we were able to successfully start 4000+ nodes > > with ZookeeperDiscoverySpi. New implementation works faster than > > TcpDiscoverySpi and does not have mentioned TcpDiscoverySpi's drawbacks: > > - nodes alive status is controlled by ZooKeeper, so nodes never can kill > > each other > > - ZooKeeper has protection from split brain > > - with ZooKeeper it is possible to detect nodes join/failures in batches > so > > time to detect join/failure of 1 vs 100+ nodes is almost the same > > > > I'm going to finalize implementation of ZookeeperDiscoverySpi in next few > > days. But in Ignite there is one more issue caused by the fact that > > DiscoverySpi and CommunicationSpi are two independent component: it is > > possible that DiscoverySpi considers some node as alive, but at the same > > time CommunicationSpi is not able to send message to this node (this > issue > > exists not only for ZookeeperDiscoverySpi for but for TcpDiscoverySpi > too). > > Such case is very critical since all internal Ignite's code assumes that > if > > node is alive then CommunicationSpi is able to send/receive messages > > to/from this node. If it is not the case, then any Ignite operation can > > hang (with ZooKeeper such situation is possible when due to network > > failures nodes have connection with ZooKeeper, but can not establish TCP > > connection to each other). > > > > If such case arises, then Ignite should have some mechanism to forcibly > > kill faulty nodes to keep cluster operational. But note that in case of > > 'split brain' scenarios each independent part of cluster will consider > > others as 'failed' and there should be some way to choose which part > should > > be killed. It would be good to provide generic solution for this problem > as > > part of work on new DiscoverySpi. > > > > We discussed this with Yakov Zhdanov and suggest following: in case when > > communication fails to send message to some node and this node is > > considered as alive, then Ignite should trigger global 'communication > error > > resolve' process (obviously, this process should use for messaging > internal > > discovery mechanisms). As part of this process CommunicationSpi on each > > node should try to establish connection to all others alive nodes > > (TcpCommunicationSpi can do this efficiently using async NIO) and send > > results of this connection test to some coordinator node (e.g. oldest > > cluster node). When coordinator receives results of connection test from > > all nodes it calls user-defined CommunicationProblemResolver to choose > > which nodes should be killed (CommunicationProblemResolver should be set > in > > IgniteConfiguration): > > > > public interface CommunicationProblemResolver { > > public void resolve(CommunicationProblemContext ctx); > > } > > > > CommunicationProblemResolver receives CommunicationProblemContext which > > provides results of CommunicationSpi connection test. Also it can be > useful > > to have information about started caches and current cache data > > distribution to decide which part of cluster should be killed: > > > > public interface CommunicationProblemContext { > > /** > > * @return Current topology snapshot. > > */ > > public List<ClusterNode> topologySnapshot(); > > > > /** > > * @param node1 First node. > > * @param node2 Second node. > > * @return {@code True} if {@link CommunicationSpi} is able to > > establish connection from first node to second node. > > */ > > public boolean connectionAvailable(ClusterNode node1, ClusterNode > > node2); > > > > /** > > * @return List of currently started cache. > > */ > > public List<String> startedCaches(); > > > > /** > > * @param cacheName Cache name. > > * @return Cache partitions affinity assignment. > > */ > > public List<List<ClusterNode>> cacheAffinity(String cacheName); > > > > /** > > * @param cacheName Cache name. > > * @return Cache partitions owners. > > */ > > public List<List<ClusterNode>> cachePartitionOwners(String > cacheName); > > > > /** > > * @param node Node to kill after communication error resolve. > > */ > > public void killNode(ClusterNode node); > > } > > > > Default implementation of CommunicationProblemContext provided as part of > > Ignite can keep alive largest sub-cluster where all nodes are able to > > connect to each other. > > > > In addition to CommunicationProblemResolver we can fire new local > > org.apache.ignite.events.Event when CommunicationSpi fails to send > message > > to alive node (can be useful for monitoring): > > > > class CommunicationProblemEvent extends EventAdapter { > > ClusterNode eventNode(); > > > > Exception connectionError(); > > } > > > > > > Since this is pretty large change in public API I would be grateful if > you > > provide thoughts about CommunicationProblemResolver. > > > > Thank you, > > Semyon > > >
