This seems fitting for Akka actor system [1]. Maybe we could just use it as base?
- Henry [1] http://akka.io On Mon, Jul 21, 2014 at 9:47 AM, Greg Brandt <brandt.g...@gmail.com> wrote: > We talked about this in a little bit greater detail, and it's seeming like > there are three basic components that we want: > > 1. Transport - physically moving messages > 2. Resolution - map cluster scope to set of machines > 3. Management - message state, errors, retries, etc. > > The transport interface should be generic enough to allow many > implementations (e.g. Netty, Kafka, etc.) It may be valuable to do multiple > implementations if one underlying system supports desired message delivery > guarantees better than another. For example, Kafka would be more amenable > to at-least-once delivery, whereas something like Netty, with much less > overhead than Kafka, would be better for at-most-once delivery. > > The goal here I think is to provide something that's most similar to Akka > Actors, with the following guarantees: > > 1. at-most-once delivery > 2. message ordering per sender-receiver pair > > A light-weight messaging primitive with those guarantees would allow > implementation of many interesting things like Kishore mentions. Netty > seems most suited to this task (it is the underlying transport for Akka as > well). The reason to avoid Akka I think is that it imposes too much of its > own philosophy on the user, and is really bloated. A Netty-based primitive > could be provided out-of-the-box, and the interfaces it implements should > be easy enough for users to implement on their own. It would be relatively > trivial to add a Kafka-based implementation out-of-the-box as well. > > (Have a bit of a prototype here: > https://github.com/brandtg/helix-actors/blob/master/helix-actors/src/main/java/org/apache/helix/actor/netty/NettyHelixActor.java > ) > > The transport mechanism should be agnostic to anything going on in the > cluster: the resolution module is totally separate, and maps cluster scope > to sets of nodes. For example, one could send a message to the entire > cluster, specific partitions of a resource, or even from one cluster to > another (via composing resolution modules or something). > > Then there would be a management component, composed with the other two > components, which manages user callbacks associated with varying cluster > scopes, tracks messages based on ID in order to perform request / response > or feedback-loop kinds of things, manages errors / timeouts / retries, etc. > > -Greg > > > > On Sat, Jul 12, 2014 at 11:27 AM, kishore g <g.kish...@gmail.com> wrote: > >> Hi Vlad, >> >> Yes the idea is to have a pluggable architecture where Helix provides the >> messaging channel without serializing/deserializing the data. In the end, I >> envision having most of the building blocks needed to build a distributed >> system. One should be able to build a datastore/search/olap/pu sub/stream >> processing systems easily on top of Helix. >> >> I am curious to see what kind of tuning you guys did to handle 1.5 QPS. Our >> goal is to have 1M QPS between the two servers. In order to achieve this we >> might need pipelining/batching along with ordering guarantee's. This is non >> trivial and error prone, I see a huge value in providing this feature. Most >> systems have built some form replication schemes on their own. There are >> two parts in the replication, one is the high through put message exchange >> and the other is the scheme that is synchronous replication/async >> replication/chained replication/consensus etc. The first part is the common >> part that is needed across all the replication scheme. >> >> Of course getting this right is not easy and design is tricky but I feel >> its worth a try. >> >> This is the high level flow I had discussed with Greg. >> >> Each server can host P partitions and is listening at one port. Based on >> how many execution threads are needed, we can create C channels. Every >> partition maps to a specific channel. On each channel we can guarantee >> message ordering. We simply provide a callback to handle each message and >> pass the received bytes without deserializing. Its up to the callback to >> either process it synchronously or asynchronously. >> >> There are more details that its difficult to go over in the email. We >> should probably write up the design and then deep dive on the details. >> >> thanks, >> Kishore G >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> On Sat, Jul 12, 2014 at 8:08 AM, vlad...@gmail.com <vlad...@gmail.com> >> wrote: >> >> > I agree that having a messaging scheme that is not ZooKeeper-reliant >> would >> > help in building some synchronization protocols between clients and >> > replicas. I am thinking of cases such as writing inside Kafka to >> replicated >> > partitions our our own application,a replicated Key/Value store. >> > >> > For me, service discovery bridges the gap between an application wanting >> > to address a service (in Helix's case more particularly, a >> service/resource >> > name/partition tuple) and finding out a list of hostnames that can >> receive >> > that request. After that, whether the application wants to use Helix >> itself >> > or another messaging stack is an open problem, perhaps a clear separation >> > of the discovery/messaging layers would allow for quickly swapping >> > different pieces here. >> > >> > The interesting part (in my view) in providing fast messaging as part of >> > Helix is the ability to use the service topology information that Helix >> > hosts (number of replicas and their placement) together with the >> messaging >> > layer as sufficient components for realizing a consensus algorithm >> between >> > replicas, that is providing, on top of the regular service discovery >> > function of writing to one instance of a service, the function of writing >> > with consensus verification to all replicas of a service. >> > >> > In terms of capabilities, looking at Kafka and KV/Store as use cases, >> this >> > would mean running a whole protocol between the replicas of the service >> to >> > achieve a certain synchronization outcome (be it Paxos, 2 Phase Commit or >> > Chain Replication - Paxos would probably not be needed since Helix >> already >> > provides a resilient master for the partition, so there is a clear >> > hierarchy). Kafka suggests even more cases if we take the interaction >> > between client and service into account, namely specifying the call to >> > return after the master gets the data or after consensus between master >> and >> > replicas have been achieved, or sending data asynchronously (the >> > combination of the producer.type and request.required.acks parameters of >> > the producer). >> > >> > The only reason for which I would like to see a pluggable architecture is >> > that in high volume applications the messaging stack requires a lot of >> > tuning. As I said at the meet-up, in our company we deal with about 1.5M >> > QPS, so having serialization/deserialization overheads per message, or >> > event separate listening/executor threads can be resource intensive. >> > Whenever we can, we are tempted to use a stage-event drive architecture, >> > with few threads that handle large message pools. This also raises the >> > question of how to implement the upper-layer callbacks (that would >> perform >> > the synchronization-related actions) without spawning new threads. >> > >> > Regards, >> > Vlad >> > >> > >> > On Fri, Jul 11, 2014 at 11:40 PM, kishore g <g.kish...@gmail.com> wrote: >> > >> >> Good point Vlad, I was thinking of defining the right abstraction and >> >> possibly provide one implementation based. I think the synchronous and >> >> asynchronous messaging should be covered as part of this. >> >> >> >> Also I think Finagle and likes of it cater more towards the client >> server >> >> communication but what we lack today is a good solution for peer to peer >> >> transport. For example, if some one has to build a consensus layer they >> >> have build everything from ground up. Providing a base layer that >> exchange >> >> messages between peers on a per partition basis can be a great building >> >> block to build different replication schemes. >> >> >> >> Overall messaging can be used for two cases >> >> -- data and control. >> >> >> >> >> >> CONTROL >> >> >> >> This is needed for control messages that occur rarely, for these type of >> >> messages latency/throughput is not important but its important for it >> to be >> >> reliable. >> >> >> >> >> >> DATA >> >> >> >> This is mainly exchanging data between different roles, >> >> controller-participant, participant-participant, spectator-participant. >> >> These types of message exchanges occur quite frequently and having high >> >> throughput and low latency is a requirement. >> >> >> >> For example having the following api and guarantee would allow one to >> >> build synchronous replication, asynchronous, quorum etc. >> >> >> >> send(partition, message, ACK_MODE, callback) ACK_MODE can be like ACK >> >> from ALL, QUORUM, 1, NONE etc. callback is fired when one gets the >> response >> >> back from the receiver. >> >> >> >> This simple api can allow both synchronous and asynchronous mode of >> >> communication. We dont have to do the serialization/deserialization. The >> >> hard part here would be to guarantee ordering between the messages. One >> >> should be able to specify the message ordering requirement, FIFO on a >> per >> >> partition level or dont care about ordering. Having this makes it easy >> for >> >> one to implement replication schemes. >> >> >> >> thanks, >> >> Kishore G >> >> >> >> >> >> On Fri, Jul 11, 2014 at 12:49 PM, Greg Brandt <brandt.g...@gmail.com> >> >> wrote: >> >> >> >>> Vlad, >> >>> >> >>> I'm not sure that the goal here is to cover all possible use cases. >> This >> >>> is >> >>> intended as basically a replacement for the current Helix cluster >> >>> messaging >> >>> service, which doesn't rely on ZooKeeper. I would argue that Helix is >> >>> already that general framework for discovering the service endpoint, >> and >> >>> that this is just one implementation of an underlying messaging stack >> >>> (for >> >>> which there is currently only the ZooKeeper-based implementation). >> >>> >> >>> Moreover, keeping it too general (i.e. providing the APIs outlined in >> the >> >>> JIRA, but no implementation) puts much greater onus on the user. It >> would >> >>> be nice for someone just starting out with Helix to have a pretty good >> >>> messaging service readily available, as well as the APIs to implement >> >>> more >> >>> specific solutions if he or she so chooses. >> >>> >> >>> -Greg >> >>> >> >>> >> >>> On Fri, Jul 11, 2014 at 11:07 AM, vlad...@gmail.com <vlad...@gmail.com >> > >> >>> wrote: >> >>> >> >>> > This sounds like service discovery for a messaging solution. At this >> >>> point >> >>> > there would be some overlap with message buses such as Finagle. >> Perhaps >> >>> > this should be a general framework for discovering the service >> endpoint >> >>> > that could leave the actual underlaying messaging stack open (be it >> >>> > Finagle, Netty-based or ZeroMQ, for example). My only fear is that if >> >>> the >> >>> > messaging framework is encapsulated completely into Helix, it would >> be >> >>> hard >> >>> > to compete with tuned messaging bus solutions and cover all possible >> >>> use >> >>> > cases (for example, in my company, we use a large number of sub cases >> >>> on >> >>> > the synchronous call to asynchronous call spectrum). >> >>> > >> >>> > Regards, >> >>> > Vlad >> >>> > >> >>> > >> >>> > On Fri, Jul 11, 2014 at 10:56 AM, Greg Brandt <brandt.g...@gmail.com >> > >> >>> > wrote: >> >>> > >> >>> >> (copied from HELIX-470) >> >>> >> >> >>> >> Helix is missing a high-performance way to exchange messages among >> >>> >> resource partitions, with a user-friendly API. >> >>> >> >> >>> >> Currently, the Helix messaging service relies on creating many nodes >> >>> in >> >>> >> ZooKeeper, which can lead to ZooKeeper outages if messages are sent >> >>> too >> >>> >> frequently. >> >>> >> >> >>> >> In order to avoid this, high-performance NIO-based HelixActors >> should >> >>> be >> >>> >> implemented (in rough accordance with the actor model). HelixActors >> >>> exchange >> >>> >> messages asynchronously without waiting for a response, and are >> >>> >> partition/state-addressable. >> >>> >> >> >>> >> The API would look something like this: >> >>> >> >> >>> >> public interface HelixActor<T> { >> >>> >> void send(Partition partition, String state, T message); >> >>> >> void register(String resource, HelixActorCallback<T> callback); >> >>> >> } >> >>> >> public interface HelixActorCallback<T> { >> >>> >> void onMessage(Partition partition, State state, T message); >> >>> >> } >> >>> >> >> >>> >> #send should likely support wildcards for partition number and >> state, >> >>> or >> >>> >> its method signature might need to be massaged a little bit for more >> >>> >> flexibility. But that's the basic idea. >> >>> >> >> >>> >> Nothing is inferred about the format of the messages - the only >> >>> metadata >> >>> >> we need to be able to interpret is (1) partition name and (2) state. >> >>> The >> >>> >> user provides a codec to encode / decode messages, so it's nicer to >> >>> >> implementHelixActor#send and HelixActorCallback#onMessage. >> >>> >> >> >>> >> public interface HelixActorMessageCodec<T> { >> >>> >> byte[] encode(T message); >> >>> >> T decode(byte[] message); >> >>> >> } >> >>> >> >> >>> >> Actors should support somewhere around 100k to 1M messages per >> second. >> >>> >> The Netty framework is a potential implementation candidate, but >> >>> should be >> >>> >> thoroughly evaluated w.r.t. performance. >> >>> >> >> >>> > >> >>> > >> >>> >> >> >> >> >> > >>