Should we move the ZK cross data center to a separate thread. This is an interesting problem and probably needs more detailed discussions.
What do you guys think. On Wed, Aug 20, 2014 at 7:44 PM, Kanak Biscuitwala <kana...@hotmail.com> wrote: > Hey Vlad, > > In theory, we should be able to plug in a resolver that can resolve ZKs in > other datacenters. We already resolve different clusters within the same > ZK. Here is the resolver interface: > > > https://github.com/brandtg/helix-actors/blob/master/helix-ipc/src/main/java/org/apache/helix/resolver/HelixResolver.java > > Right now the ZK implementation of the resolver can return a different > Helix manager for different clusters, and perhaps it can be adapted to also > accept a different ZK address from a different DC. Then we can connect to > ZK observers. However, the message scope probably needs an additional > resolver-specific metadata field or something that we can use to store the > ZK observer addresses (and therefore a corresponding field in a message). > > Regarding ZK vs DNS, push updates via watchers is an advantage as you > pointed out. Also since Helix persists state on ZK, you have access to the > entire cluster metadata in the context of your requests. This can certainly > be implemented in a DNS-like scheme, though. Perhaps others can chime in on > the tradeoffs here. > > Kanak > > ------------------------------ > From: brandt.g...@gmail.com > Date: Wed, 20 Aug 2014 18:16:55 -0700 > Subject: Re: Add performant IPC (Helix actors) > To: u...@helix.apache.org > CC: dev@helix.apache.org > > > Hey Vlad, > > Correct, ZooKeeper would still be used for the address resolution > component. > > Cross-datacenter hasn't really been thought of yet, but you do make a good > point in that we should address this early because it usually seems to be > an afterthought. However, we are planning to allow cross-cluster messaging > within the same ZooKeeper. > > Something that might be interesting to do in the cross-datacenter use case > is to replicate each datacenter's ZooKeeper logs to each other datacenter > in order to maintain a read-only view of that ZooKeeper cluster. One could > then resolve addresses with only connections to local ZooKeeper(s), though > the replication delay would have to be accounted for during state > transitions. > > I've got a small project that does generic log change capture, and intend > on doing ZooKeeper log replication next, actually - have MySQL binary log > and HDFS namenode edit logs now (https://github.com/brandtg/switchboard). > Just an idea. > > -Greg > > > On Wed, Aug 20, 2014 at 6:00 PM, vlad...@gmail.com <vlad...@gmail.com> > wrote: > > > Hi Greg, > > Within Turn, a few of us were looking at Helix as a possible solution for > service discovery. It seems that the new system would keep the address > resolution of the old messaging system but replace ZooKeeper with a direct > connection for high throughput. Incidentally, we have a similar system, so > we are more interested in the addressing part. > One thing that came up within our discussion was the possibility of > accessing services in other datacenters. Are there plans to offer > inter-datacenter discovery capabilities? Would that be based on > ZK-observers or some other import mechanism (someone would need to connect > to the remote ZK, preferably not the client). > Also, especially in the across-datacenter case, there are clear parallels > to using DNS for the service discovery (I think Netflix published a blog > post where they detailed this as an option). What would we gain and lose by > using ZK as the building block as opposed to DNS? One thing that comes to > mind is that ZK-based service discovery in Helix can be configured as a > push-from-ZK or a poll-by-client service, while DNS entries timeout and > must always be polled again. > > Regards, > Vlad > > > On Wed, Aug 20, 2014 at 5:46 PM, Greg Brandt <brandt.g...@gmail.com> > wrote: > > Hey Henry, > > Initially, I thought the same thing. But after evaluating Akka Actors, I > think they're a fundamentally different approach to developing distributed > systems as opposed to simply a messaging layer implementation. Let me > explain... > > The primary difference between Akka Actors and Helix is that the former > prefers loose, decentralized control, whereas the latter prefers strong, > centralized control. Akka applications are expected to manage state by > themselves, whereas in Helix, that responsibility is delegated to the > controller. And the actual user Akka Actor implementations are analogous to > Helix Participant implementations. > > So with that perspective, it would be a bit of a kludge to leverage Akka > Actors for simple cluster-scope-addressable message passing. Actually, > Helix already provides this messaging feature, but it is done via ZooKeeper > which is dangerous and less-than-performant for a high volume of messages. > > The right thing to do seems to be to provide a set of better transport > implemenations (e.g. Netty, ActiveMQ, Kafka, etc.) for the > cluster-scope-addressable messaging system, and better preserve the > fundamental concepts in Helix. > > Let me know what you think. > > -Greg > > > > On Wed, Aug 20, 2014 at 9:55 AM, Henry Saputra <henry.sapu...@gmail.com> > wrote: > > This seems fitting for Akka actor system [1]. Maybe we could just use > it as base? > > > - Henry > > [1] http://akka.io > > On Mon, Jul 21, 2014 at 9:47 AM, Greg Brandt <brandt.g...@gmail.com> > wrote: > > We talked about this in a little bit greater detail, and it's seeming > like > > there are three basic components that we want: > > > > 1. Transport - physically moving messages > > 2. Resolution - map cluster scope to set of machines > > 3. Management - message state, errors, retries, etc. > > > > The transport interface should be generic enough to allow many > > implementations (e.g. Netty, Kafka, etc.) It may be valuable to do > multiple > > implementations if one underlying system supports desired message > delivery > > guarantees better than another. For example, Kafka would be more amenable > > to at-least-once delivery, whereas something like Netty, with much less > > overhead than Kafka, would be better for at-most-once delivery. > > > > The goal here I think is to provide something that's most similar to Akka > > Actors, with the following guarantees: > > > > 1. at-most-once delivery > > 2. message ordering per sender-receiver pair > > > > A light-weight messaging primitive with those guarantees would allow > > implementation of many interesting things like Kishore mentions. Netty > > seems most suited to this task (it is the underlying transport for Akka > as > > well). The reason to avoid Akka I think is that it imposes too much of > its > > own philosophy on the user, and is really bloated. A Netty-based > primitive > > could be provided out-of-the-box, and the interfaces it implements should > > be easy enough for users to implement on their own. It would be > relatively > > trivial to add a Kafka-based implementation out-of-the-box as well. > > > > (Have a bit of a prototype here: > > > https://github.com/brandtg/helix-actors/blob/master/helix-actors/src/main/java/org/apache/helix/actor/netty/NettyHelixActor.java > > ) > > > > The transport mechanism should be agnostic to anything going on in the > > cluster: the resolution module is totally separate, and maps cluster > scope > > to sets of nodes. For example, one could send a message to the entire > > cluster, specific partitions of a resource, or even from one cluster to > > another (via composing resolution modules or something). > > > > Then there would be a management component, composed with the other two > > components, which manages user callbacks associated with varying cluster > > scopes, tracks messages based on ID in order to perform request / > response > > or feedback-loop kinds of things, manages errors / timeouts / retries, > etc. > > > > -Greg > > > > > > > > On Sat, Jul 12, 2014 at 11:27 AM, kishore g <g.kish...@gmail.com> wrote: > > > >> Hi Vlad, > >> > >> Yes the idea is to have a pluggable architecture where Helix provides > the > >> messaging channel without serializing/deserializing the data. In the > end, I > >> envision having most of the building blocks needed to build a > distributed > >> system. One should be able to build a datastore/search/olap/pu > sub/stream > >> processing systems easily on top of Helix. > >> > >> I am curious to see what kind of tuning you guys did to handle 1.5 QPS. > Our > >> goal is to have 1M QPS between the two servers. In order to achieve > this we > >> might need pipelining/batching along with ordering guarantee's. This is > non > >> trivial and error prone, I see a huge value in providing this feature. > Most > >> systems have built some form replication schemes on their own. There are > >> two parts in the replication, one is the high through put message > exchange > >> and the other is the scheme that is synchronous replication/async > >> replication/chained replication/consensus etc. The first part is the > common > >> part that is needed across all the replication scheme. > >> > >> Of course getting this right is not easy and design is tricky but I feel > >> its worth a try. > >> > >> This is the high level flow I had discussed with Greg. > >> > >> Each server can host P partitions and is listening at one port. Based on > >> how many execution threads are needed, we can create C channels. Every > >> partition maps to a specific channel. On each channel we can guarantee > >> message ordering. We simply provide a callback to handle each message > and > >> pass the received bytes without deserializing. Its up to the callback to > >> either process it synchronously or asynchronously. > >> > >> There are more details that its difficult to go over in the email. We > >> should probably write up the design and then deep dive on the details. > >> > >> thanks, > >> Kishore G > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> On Sat, Jul 12, 2014 at 8:08 AM, vlad...@gmail.com <vlad...@gmail.com> > >> wrote: > >> > >> > I agree that having a messaging scheme that is not ZooKeeper-reliant > >> would > >> > help in building some synchronization protocols between clients and > >> > replicas. I am thinking of cases such as writing inside Kafka to > >> replicated > >> > partitions our our own application,a replicated Key/Value store. > >> > > >> > For me, service discovery bridges the gap between an application > wanting > >> > to address a service (in Helix's case more particularly, a > >> service/resource > >> > name/partition tuple) and finding out a list of hostnames that can > >> receive > >> > that request. After that, whether the application wants to use Helix > >> itself > >> > or another messaging stack is an open problem, perhaps a clear > separation > >> > of the discovery/messaging layers would allow for quickly swapping > >> > different pieces here. > >> > > >> > The interesting part (in my view) in providing fast messaging as part > of > >> > Helix is the ability to use the service topology information that > Helix > >> > hosts (number of replicas and their placement) together with the > >> messaging > >> > layer as sufficient components for realizing a consensus algorithm > >> between > >> > replicas, that is providing, on top of the regular service discovery > >> > function of writing to one instance of a service, the function of > writing > >> > with consensus verification to all replicas of a service. > >> > > >> > In terms of capabilities, looking at Kafka and KV/Store as use cases, > >> this > >> > would mean running a whole protocol between the replicas of the > service > >> to > >> > achieve a certain synchronization outcome (be it Paxos, 2 Phase > Commit or > >> > Chain Replication - Paxos would probably not be needed since Helix > >> already > >> > provides a resilient master for the partition, so there is a clear > >> > hierarchy). Kafka suggests even more cases if we take the interaction > >> > between client and service into account, namely specifying the call to > >> > return after the master gets the data or after consensus between > master > >> and > >> > replicas have been achieved, or sending data asynchronously (the > >> > combination of the producer.type and request.required.acks parameters > of > >> > the producer). > >> > > >> > The only reason for which I would like to see a pluggable > architecture is > >> > that in high volume applications the messaging stack requires a lot of > >> > tuning. As I said at the meet-up, in our company we deal with about > 1.5M > >> > QPS, so having serialization/deserialization overheads per message, or > >> > event separate listening/executor threads can be resource intensive. > >> > Whenever we can, we are tempted to use a stage-event drive > architecture, > >> > with few threads that handle large message pools. This also raises the > >> > question of how to implement the upper-layer callbacks (that would > >> perform > >> > the synchronization-related actions) without spawning new threads. > >> > > >> > Regards, > >> > Vlad > >> > > >> > > >> > On Fri, Jul 11, 2014 at 11:40 PM, kishore g <g.kish...@gmail.com> > wrote: > >> > > >> >> Good point Vlad, I was thinking of defining the right abstraction and > >> >> possibly provide one implementation based. I think the synchronous > and > >> >> asynchronous messaging should be covered as part of this. > >> >> > >> >> Also I think Finagle and likes of it cater more towards the client > >> server > >> >> communication but what we lack today is a good solution for peer to > peer > >> >> transport. For example, if some one has to build a consensus layer > they > >> >> have build everything from ground up. Providing a base layer that > >> exchange > >> >> messages between peers on a per partition basis can be a great > building > >> >> block to build different replication schemes. > >> >> > >> >> Overall messaging can be used for two cases > >> >> -- data and control. > >> >> > >> >> > >> >> CONTROL > >> >> > >> >> This is needed for control messages that occur rarely, for these > type of > >> >> messages latency/throughput is not important but its important for it > >> to be > >> >> reliable. > >> >> > >> >> > >> >> DATA > >> >> > >> >> This is mainly exchanging data between different roles, > >> >> controller-participant, participant-participant, > spectator-participant. > >> >> These types of message exchanges occur quite frequently and having > high > >> >> throughput and low latency is a requirement. > >> >> > >> >> For example having the following api and guarantee would allow one to > >> >> build synchronous replication, asynchronous, quorum etc. > >> >> > >> >> send(partition, message, ACK_MODE, callback) ACK_MODE can be like ACK > >> >> from ALL, QUORUM, 1, NONE etc. callback is fired when one gets the > >> response > >> >> back from the receiver. > >> >> > >> >> This simple api can allow both synchronous and asynchronous mode of > >> >> communication. We dont have to do the serialization/deserialization. > The > >> >> hard part here would be to guarantee ordering between the messages. > One > >> >> should be able to specify the message ordering requirement, FIFO on a > >> per > >> >> partition level or dont care about ordering. Having this makes it > easy > >> for > >> >> one to implement replication schemes. > >> >> > >> >> thanks, > >> >> Kishore G > >> >> > >> >> > >> >> On Fri, Jul 11, 2014 at 12:49 PM, Greg Brandt <brandt.g...@gmail.com > > > >> >> wrote: > >> >> > >> >>> Vlad, > >> >>> > >> >>> I'm not sure that the goal here is to cover all possible use cases. > >> This > >> >>> is > >> >>> intended as basically a replacement for the current Helix cluster > >> >>> messaging > >> >>> service, which doesn't rely on ZooKeeper. I would argue that Helix > is > >> >>> already that general framework for discovering the service endpoint, > >> and > >> >>> that this is just one implementation of an underlying messaging > stack > >> >>> (for > >> >>> which there is currently only the ZooKeeper-based implementation). > >> >>> > >> >>> Moreover, keeping it too general (i.e. providing the APIs outlined > in > >> the > >> >>> JIRA, but no implementation) puts much greater onus on the user. It > >> would > >> >>> be nice for someone just starting out with Helix to have a pretty > good > >> >>> messaging service readily available, as well as the APIs to > implement > >> >>> more > >> >>> specific solutions if he or she so chooses. > >> >>> > >> >>> -Greg > >> >>> > >> >>> > >> >>> On Fri, Jul 11, 2014 at 11:07 AM, vlad...@gmail.com < > vlad...@gmail.com > >> > > >> >>> wrote: > >> >>> > >> >>> > This sounds like service discovery for a messaging solution. At > this > >> >>> point > >> >>> > there would be some overlap with message buses such as Finagle. > >> Perhaps > >> >>> > this should be a general framework for discovering the service > >> endpoint > >> >>> > that could leave the actual underlaying messaging stack open (be > it > >> >>> > Finagle, Netty-based or ZeroMQ, for example). My only fear is > that if > >> >>> the > >> >>> > messaging framework is encapsulated completely into Helix, it > would > >> be > >> >>> hard > >> >>> > to compete with tuned messaging bus solutions and cover all > possible > >> >>> use > >> >>> > cases (for example, in my company, we use a large number of sub > cases > >> >>> on > >> >>> > the synchronous call to asynchronous call spectrum). > >> >>> > > >> >>> > Regards, > >> >>> > Vlad > >> >>> > > >> >>> > > >> >>> > On Fri, Jul 11, 2014 at 10:56 AM, Greg Brandt < > brandt.g...@gmail.com > >> > > >> >>> > wrote: > >> >>> > > >> >>> >> (copied from HELIX-470) > >> >>> >> > >> >>> >> Helix is missing a high-performance way to exchange messages > among > >> >>> >> resource partitions, with a user-friendly API. > >> >>> >> > >> >>> >> Currently, the Helix messaging service relies on creating many > nodes > >> >>> in > >> >>> >> ZooKeeper, which can lead to ZooKeeper outages if messages are > sent > >> >>> too > >> >>> >> frequently. > >> >>> >> > >> >>> >> In order to avoid this, high-performance NIO-based HelixActors > >> should > >> >>> be > >> >>> >> implemented (in rough accordance with the actor model). > HelixActors > >> >>> exchange > >> >>> >> messages asynchronously without waiting for a response, and are > >> >>> >> partition/state-addressable. > >> >>> >> > >> >>> >> The API would look something like this: > >> >>> >> > >> >>> >> public interface HelixActor<T> { > >> >>> >> void send(Partition partition, String state, T message); > >> >>> >> void register(String resource, HelixActorCallback<T> > callback); > >> >>> >> } > >> >>> >> public interface HelixActorCallback<T> { > >> >>> >> void onMessage(Partition partition, State state, T message); > >> >>> >> } > >> >>> >> > >> >>> >> #send should likely support wildcards for partition number and > >> state, > >> >>> or > >> >>> >> its method signature might need to be massaged a little bit for > more > >> >>> >> flexibility. But that's the basic idea. > >> >>> >> > >> >>> >> Nothing is inferred about the format of the messages - the only > >> >>> metadata > >> >>> >> we need to be able to interpret is (1) partition name and (2) > state. > >> >>> The > >> >>> >> user provides a codec to encode / decode messages, so it's nicer > to > >> >>> >> implementHelixActor#send and HelixActorCallback#onMessage. > >> >>> >> > >> >>> >> public interface HelixActorMessageCodec<T> { > >> >>> >> byte[] encode(T message); > >> >>> >> T decode(byte[] message); > >> >>> >> } > >> >>> >> > >> >>> >> Actors should support somewhere around 100k to 1M messages per > >> second. > >> >>> >> The Netty framework is a potential implementation candidate, but > >> >>> should be > >> >>> >> thoroughly evaluated w.r.t. performance. > >> >>> >> > >> >>> > > >> >>> > > >> >>> > >> >> > >> >> > >> > > >> > > > > >