+1, if there is no additional dependency. But I would like to go through the review process. I dont think we can use rb.
Henry had couple of ideas -- create a branch and give access to Greg/Sandeep on the branches. Apache allows contributors to work on branches. Not sure how to get this. -- Greg can fork the apache helix github repo and submit a PR and we can comment on the PR. Apache has done some cool integration with github, Henry can provide more info on this. On Tue, Aug 26, 2014 at 9:39 AM, Kanak Biscuitwala <kana...@hotmail.com> wrote: > > Are folks OK with the current iteration of this being in the 0.7.1 beta? > It would be in a separate module, so no additional dependencies are > required, and it could be a good opportunity to get some more > feedback.Date: Fri, 22 Aug 2014 13:42:31 -0700 > Subject: Re: Add performant IPC (Helix actors) > From: g.kish...@gmail.com > To: dev@helix.apache.org > CC: u...@helix.apache.org > > Thanks Greg for explanation. I agree Akka is 2 or 3 layers above what > Helix-ipc is trying to provide. Also there are some nuances in order > guarantees with Akka. For control messages where reliability is a must and > probably more important than latency, I think using ZK as the communication > channel is the right solution. Without pushing state transition message > through a consensus system like ZK, there will be lot of edge cases where > ensuring correctness becomes impossible. > > > > > > > > > > > On Fri, Aug 22, 2014 at 1:29 PM, Greg Brandt <brandt.g...@gmail.com> > wrote: > > Hey Henry, > > > > So Akka actors actually give exactly the same messaging guarantees as the > > current implementation of Helix IPC: at-most-once message delivery, and > > ordering per sender-receiver pair. In Helix's case, the latter can be > > thought of further as ordering per-partition or resource, but basically is > > guaranteed by Netty's Channel implementation for sender-receiver pair ( > > > http://stackoverflow.com/questions/9082101/do-messages-sent-via-nettys-channel-write-preserve-their-order-when-begin-sen > > > ). > > > > What Akka provides that this IPC layer doesn't is the application life > > cycle management part. This is why I made the claim that Akka Actors are > > analogous to Helix Participants. One writes callbacks for different message > > types in the Actor API to manage various control messages in the same way > > one writes callbacks in Helix to manage application state transitions. > > > > But more importantly, using Akka instead of getting really down into the > > transport layer might hide too much when we want to use it for > > performance-critical things like data replication. Continuing with that > > example, we likely want to use direct IO and minimize data copies in the > > transport layer, and it seems like some of the niceness involved in Akka's > > API would get in the way of that. > > > > Something that may be really cool to do is write another Akka transport > > implementation on top of this, so you could basically add partition/state > > to the Akka API and not have to implement the state machines yourself (as > > is shown in some Akka docs). In that scenario, users should probably be > > unaware that they were using Helix and just use the same Akka API. > > > > -Greg > > > > > > On Fri, Aug 22, 2014 at 11:13 AM, Henry Saputra <henry.sapu...@gmail.com> > > wrote: > > > > > Hi Greg, > > > > > > Thanks for the reply. Sorry for late for the discussions. > > > > > > If the IPC semantic will be used only by Helix internally I agree that > > > lower level transport like Netty should be the way to go. > > > > > > For control plane layer, such as communications between controller and > > > participants, wouldn't Akka actors give benefits of already reliable > > > message passing and monitoring without building from ground up? > > > Akka actor system has many different routers and allow callback to > > > caller to get async response (in this case return back response to > > > controller). > > > > > > I just saw the similarity of what the new API looks like with what > > > Akka actor APIs. > > > > > > Thanks, > > > > > > - Henry > > > > > > On Wed, Aug 20, 2014 at 5:46 PM, Greg Brandt <brandt.g...@gmail.com> > > > wrote: > > > > Hey Henry, > > > > > > > > Initially, I thought the same thing. But after evaluating Akka Actors, > I > > > > think they're a fundamentally different approach to developing > > > distributed > > > > systems as opposed to simply a messaging layer implementation. Let me > > > > explain... > > > > > > > > The primary difference between Akka Actors and Helix is that the former > > > > prefers loose, decentralized control, whereas the latter prefers > strong, > > > > centralized control. Akka applications are expected to manage state by > > > > themselves, whereas in Helix, that responsibility is delegated to the > > > > controller. And the actual user Akka Actor implementations are > analogous > > > to > > > > Helix Participant implementations. > > > > > > > > So with that perspective, it would be a bit of a kludge to leverage > Akka > > > > Actors for simple cluster-scope-addressable message passing. Actually, > > > > Helix already provides this messaging feature, but it is done via > > > ZooKeeper > > > > which is dangerous and less-than-performant for a high volume of > > > messages. > > > > > > > > The right thing to do seems to be to provide a set of better transport > > > > implemenations (e.g. Netty, ActiveMQ, Kafka, etc.) for the > > > > cluster-scope-addressable messaging system, and better preserve the > > > > fundamental concepts in Helix. > > > > > > > > Let me know what you think. > > > > > > > > -Greg > > > > > > > > > > > > > > > > On Wed, Aug 20, 2014 at 9:55 AM, Henry Saputra < > henry.sapu...@gmail.com> > > > > wrote: > > > > > > > >> This seems fitting for Akka actor system [1]. Maybe we could just use > > > >> it as base? > > > >> > > > >> > > > >> - Henry > > > >> > > > >> [1] http://akka.io > > > >> > > > >> On Mon, Jul 21, 2014 at 9:47 AM, Greg Brandt <brandt.g...@gmail.com> > > > >> wrote: > > > >> > We talked about this in a little bit greater detail, and it's > seeming > > > >> like > > > >> > there are three basic components that we want: > > > >> > > > > >> > 1. Transport - physically moving messages > > > >> > 2. Resolution - map cluster scope to set of machines > > > >> > 3. Management - message state, errors, retries, etc. > > > >> > > > > >> > The transport interface should be generic enough to allow many > > > >> > implementations (e.g. Netty, Kafka, etc.) It may be valuable to do > > > >> multiple > > > >> > implementations if one underlying system supports desired message > > > >> delivery > > > >> > guarantees better than another. For example, Kafka would be more > > > amenable > > > >> > to at-least-once delivery, whereas something like Netty, with much > > > less > > > >> > overhead than Kafka, would be better for at-most-once delivery. > > > >> > > > > >> > The goal here I think is to provide something that's most similar to > > > Akka > > > >> > Actors, with the following guarantees: > > > >> > > > > >> > 1. at-most-once delivery > > > >> > 2. message ordering per sender-receiver pair > > > >> > > > > >> > A light-weight messaging primitive with those guarantees would allow > > > >> > implementation of many interesting things like Kishore mentions. > Netty > > > >> > seems most suited to this task (it is the underlying transport for > > > Akka > > > >> as > > > >> > well). The reason to avoid Akka I think is that it imposes too much > of > > > >> its > > > >> > own philosophy on the user, and is really bloated. A Netty-based > > > >> primitive > > > >> > could be provided out-of-the-box, and the interfaces it implements > > > should > > > >> > be easy enough for users to implement on their own. It would be > > > >> relatively > > > >> > trivial to add a Kafka-based implementation out-of-the-box as well. > > > >> > > > > >> > (Have a bit of a prototype here: > > > >> > > > > >> > > > > https://github.com/brandtg/helix-actors/blob/master/helix-actors/src/main/java/org/apache/helix/actor/netty/NettyHelixActor.java > > > > >> > ) > > > >> > > > > >> > The transport mechanism should be agnostic to anything going on in > the > > > >> > cluster: the resolution module is totally separate, and maps cluster > > > >> scope > > > >> > to sets of nodes. For example, one could send a message to the > entire > > > >> > cluster, specific partitions of a resource, or even from one cluster > > > to > > > >> > another (via composing resolution modules or something). > > > >> > > > > >> > Then there would be a management component, composed with the other > > > two > > > >> > components, which manages user callbacks associated with varying > > > cluster > > > >> > scopes, tracks messages based on ID in order to perform request / > > > >> response > > > >> > or feedback-loop kinds of things, manages errors / timeouts / > retries, > > > >> etc. > > > >> > > > > >> > -Greg > > > >> > > > > >> > > > > >> > > > > >> > On Sat, Jul 12, 2014 at 11:27 AM, kishore g <g.kish...@gmail.com> > > > wrote: > > > >> > > > > >> >> Hi Vlad, > > > >> >> > > > >> >> Yes the idea is to have a pluggable architecture where Helix > provides > > > >> the > > > >> >> messaging channel without serializing/deserializing the data. In > the > > > >> end, I > > > >> >> envision having most of the building blocks needed to build a > > > >> distributed > > > >> >> system. One should be able to build a datastore/search/olap/pu > > > >> sub/stream > > > >> >> processing systems easily on top of Helix. > > > >> >> > > > >> >> I am curious to see what kind of tuning you guys did to handle 1.5 > > > QPS. > > > >> Our > > > >> >> goal is to have 1M QPS between the two servers. In order to achieve > > > >> this we > > > >> >> might need pipelining/batching along with ordering guarantee's. > This > > > is > > > >> non > > > >> >> trivial and error prone, I see a huge value in providing this > > > feature. > > > >> Most > > > >> >> systems have built some form replication schemes on their own. > There > > > are > > > >> >> two parts in the replication, one is the high through put message > > > >> exchange > > > >> >> and the other is the scheme that is synchronous replication/async > > > >> >> replication/chained replication/consensus etc. The first part is > the > > > >> common > > > >> >> part that is needed across all the replication scheme. > > > >> >> > > > >> >> Of course getting this right is not easy and design is tricky but I > > > feel > > > >> >> its worth a try. > > > >> >> > > > >> >> This is the high level flow I had discussed with Greg. > > > >> >> > > > >> >> Each server can host P partitions and is listening at one port. > > > Based on > > > >> >> how many execution threads are needed, we can create C channels. > > > Every > > > >> >> partition maps to a specific channel. On each channel we can > > > guarantee > > > >> >> message ordering. We simply provide a callback to handle each > message > > > >> and > > > >> >> pass the received bytes without deserializing. Its up to the > > > callback to > > > >> >> either process it synchronously or asynchronously. > > > >> >> > > > >> >> There are more details that its difficult to go over in the email. > We > > > >> >> should probably write up the design and then deep dive on the > > > details. > > > >> >> > > > >> >> thanks, > > > >> >> Kishore G > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> On Sat, Jul 12, 2014 at 8:08 AM, vlad...@gmail.com < > > > vlad...@gmail.com> > > > >> >> wrote: > > > >> >> > > > >> >> > I agree that having a messaging scheme that is not > > > ZooKeeper-reliant > > > >> >> would > > > >> >> > help in building some synchronization protocols between clients > and > > > >> >> > replicas. I am thinking of cases such as writing inside Kafka to > > > >> >> replicated > > > >> >> > partitions our our own application,a replicated Key/Value store. > > > >> >> > > > > >> >> > For me, service discovery bridges the gap between an application > > > >> wanting > > > >> >> > to address a service (in Helix's case more particularly, a > > > >> >> service/resource > > > >> >> > name/partition tuple) and finding out a list of hostnames that > can > > > >> >> receive > > > >> >> > that request. After that, whether the application wants to use > > > Helix > > > >> >> itself > > > >> >> > or another messaging stack is an open problem, perhaps a clear > > > >> separation > > > >> >> > of the discovery/messaging layers would allow for quickly > swapping > > > >> >> > different pieces here. > > > >> >> > > > > >> >> > The interesting part (in my view) in providing fast messaging as > > > part > > > >> of > > > >> >> > Helix is the ability to use the service topology information that > > > >> Helix > > > >> >> > hosts (number of replicas and their placement) together with the > > > >> >> messaging > > > >> >> > layer as sufficient components for realizing a consensus > algorithm > > > >> >> between > > > >> >> > replicas, that is providing, on top of the regular service > > > discovery > > > >> >> > function of writing to one instance of a service, the function of > > > >> writing > > > >> >> > with consensus verification to all replicas of a service. > > > >> >> > > > > >> >> > In terms of capabilities, looking at Kafka and KV/Store as use > > > cases, > > > >> >> this > > > >> >> > would mean running a whole protocol between the replicas of the > > > >> service > > > >> >> to > > > >> >> > achieve a certain synchronization outcome (be it Paxos, 2 Phase > > > >> Commit or > > > >> >> > Chain Replication - Paxos would probably not be needed since > Helix > > > >> >> already > > > >> >> > provides a resilient master for the partition, so there is a > clear > > > >> >> > hierarchy). Kafka suggests even more cases if we take the > > > interaction > > > >> >> > between client and service into account, namely specifying the > > > call to > > > >> >> > return after the master gets the data or after consensus between > > > >> master > > > >> >> and > > > >> >> > replicas have been achieved, or sending data asynchronously (the > > > >> >> > combination of the producer.type and request.required.acks > > > parameters > > > >> of > > > >> >> > the producer). > > > >> >> > > > > >> >> > The only reason for which I would like to see a pluggable > > > >> architecture is > > > >> >> > that in high volume applications the messaging stack requires a > > > lot of > > > >> >> > tuning. As I said at the meet-up, in our company we deal with > about > > > >> 1.5M > > > >> >> > QPS, so having serialization/deserialization overheads per > > > message, or > > > >> >> > event separate listening/executor threads can be resource > > > intensive. > > > >> >> > Whenever we can, we are tempted to use a stage-event drive > > > >> architecture, > > > >> >> > with few threads that handle large message pools. This also > raises > > > the > > > >> >> > question of how to implement the upper-layer callbacks (that > would > > > >> >> perform > > > >> >> > the synchronization-related actions) without spawning new > threads. > > > >> >> > > > > >> >> > Regards, > > > >> >> > Vlad > > > >> >> > > > > >> >> > > > > >> >> > On Fri, Jul 11, 2014 at 11:40 PM, kishore g <g.kish...@gmail.com > > > > > >> wrote: > > > >> >> > > > > >> >> >> Good point Vlad, I was thinking of defining the right > abstraction > > > and > > > >> >> >> possibly provide one implementation based. I think the > synchronous > > > >> and > > > >> >> >> asynchronous messaging should be covered as part of this. > > > >> >> >> > > > >> >> >> Also I think Finagle and likes of it cater more towards the > client > > > >> >> server > > > >> >> >> communication but what we lack today is a good solution for peer > > > to > > > >> peer > > > >> >> >> transport. For example, if some one has to build a consensus > layer > > > >> they > > > >> >> >> have build everything from ground up. Providing a base layer > that > > > >> >> exchange > > > >> >> >> messages between peers on a per partition basis can be a great > > > >> building > > > >> >> >> block to build different replication schemes. > > > >> >> >> > > > >> >> >> Overall messaging can be used for two cases > > > >> >> >> -- data and control. > > > >> >> >> > > > >> >> >> > > > >> >> >> CONTROL > > > >> >> >> > > > >> >> >> This is needed for control messages that occur rarely, for these > > > >> type of > > > >> >> >> messages latency/throughput is not important but its important > > > for it > > > >> >> to be > > > >> >> >> reliable. > > > >> >> >> > > > >> >> >> > > > >> >> >> DATA > > > >> >> >> > > > >> >> >> This is mainly exchanging data between different roles, > > > >> >> >> controller-participant, participant-participant, > > > >> spectator-participant. > > > >> >> >> These types of message exchanges occur quite frequently and > having > > > >> high > > > >> >> >> throughput and low latency is a requirement. > > > >> >> >> > > > >> >> >> For example having the following api and guarantee would allow > > > one to > > > >> >> >> build synchronous replication, asynchronous, quorum etc. > > > >> >> >> > > > >> >> >> send(partition, message, ACK_MODE, callback) ACK_MODE can be > like > > > ACK > > > >> >> >> from ALL, QUORUM, 1, NONE etc. callback is fired when one gets > the > > > >> >> response > > > >> >> >> back from the receiver. > > > >> >> >> > > > >> >> >> This simple api can allow both synchronous and asynchronous mode > > > of > > > >> >> >> communication. We dont have to do the > > > serialization/deserialization. > > > >> The > > > >> >> >> hard part here would be to guarantee ordering between the > > > messages. > > > >> One > > > >> >> >> should be able to specify the message ordering requirement, FIFO > > > on a > > > >> >> per > > > >> >> >> partition level or dont care about ordering. Having this makes > it > > > >> easy > > > >> >> for > > > >> >> >> one to implement replication schemes. > > > >> >> >> > > > >> >> >> thanks, > > > >> >> >> Kishore G > > > >> >> >> > > > >> >> >> > > > >> >> >> On Fri, Jul 11, 2014 at 12:49 PM, Greg Brandt < > > > brandt.g...@gmail.com > > > >> > > > > >> >> >> wrote: > > > >> >> >> > > > >> >> >>> Vlad, > > > >> >> >>> > > > >> >> >>> I'm not sure that the goal here is to cover all possible use > > > cases. > > > >> >> This > > > >> >> >>> is > > > >> >> >>> intended as basically a replacement for the current Helix > cluster > > > >> >> >>> messaging > > > >> >> >>> service, which doesn't rely on ZooKeeper. I would argue that > > > Helix > > > >> is > > > >> >> >>> already that general framework for discovering the service > > > endpoint, > > > >> >> and > > > >> >> >>> that this is just one implementation of an underlying messaging > > > >> stack > > > >> >> >>> (for > > > >> >> >>> which there is currently only the ZooKeeper-based > > > implementation). > > > >> >> >>> > > > >> >> >>> Moreover, keeping it too general (i.e. providing the APIs > > > outlined > > > >> in > > > >> >> the > > > >> >> >>> JIRA, but no implementation) puts much greater onus on the > user. > > > It > > > >> >> would > > > >> >> >>> be nice for someone just starting out with Helix to have a > pretty > > > >> good > > > >> >> >>> messaging service readily available, as well as the APIs to > > > >> implement > > > >> >> >>> more > > > >> >> >>> specific solutions if he or she so chooses. > > > >> >> >>> > > > >> >> >>> -Greg > > > >> >> >>> > > > >> >> >>> > > > >> >> >>> On Fri, Jul 11, 2014 at 11:07 AM, vlad...@gmail.com < > > > >> vlad...@gmail.com > > > >> >> > > > > >> >> >>> wrote: > > > >> >> >>> > > > >> >> >>> > This sounds like service discovery for a messaging solution. > At > > > >> this > > > >> >> >>> point > > > >> >> >>> > there would be some overlap with message buses such as > Finagle. > > > >> >> Perhaps > > > >> >> >>> > this should be a general framework for discovering the > service > > > >> >> endpoint > > > >> >> >>> > that could leave the actual underlaying messaging stack open > > > (be > > > >> it > > > >> >> >>> > Finagle, Netty-based or ZeroMQ, for example). My only fear is > > > >> that if > > > >> >> >>> the > > > >> >> >>> > messaging framework is encapsulated completely into Helix, it > > > >> would > > > >> >> be > > > >> >> >>> hard > > > >> >> >>> > to compete with tuned messaging bus solutions and cover all > > > >> possible > > > >> >> >>> use > > > >> >> >>> > cases (for example, in my company, we use a large number of > sub > > > >> cases > > > >> >> >>> on > > > >> >> >>> > the synchronous call to asynchronous call spectrum). > > > >> >> >>> > > > > >> >> >>> > Regards, > > > >> >> >>> > Vlad > > > >> >> >>> > > > > >> >> >>> > > > > >> >> >>> > On Fri, Jul 11, 2014 at 10:56 AM, Greg Brandt < > > > >> brandt.g...@gmail.com > > > >> >> > > > > >> >> >>> > wrote: > > > >> >> >>> > > > > >> >> >>> >> (copied from HELIX-470) > > > >> >> >>> >> > > > >> >> >>> >> Helix is missing a high-performance way to exchange messages > > > >> among > > > >> >> >>> >> resource partitions, with a user-friendly API. > > > >> >> >>> >> > > > >> >> >>> >> Currently, the Helix messaging service relies on creating > many > > > >> nodes > > > >> >> >>> in > > > >> >> >>> >> ZooKeeper, which can lead to ZooKeeper outages if messages > are > > > >> sent > > > >> >> >>> too > > > >> >> >>> >> frequently. > > > >> >> >>> >> > > > >> >> >>> >> In order to avoid this, high-performance NIO-based > HelixActors > > > >> >> should > > > >> >> >>> be > > > >> >> >>> >> implemented (in rough accordance with the actor model). > > > >> HelixActors > > > >> >> >>> exchange > > > >> >> >>> >> messages asynchronously without waiting for a response, and > > > are > > > >> >> >>> >> partition/state-addressable. > > > >> >> >>> >> > > > >> >> >>> >> The API would look something like this: > > > >> >> >>> >> > > > >> >> >>> >> public interface HelixActor<T> { > > > >> >> >>> >> void send(Partition partition, String state, T message); > > > >> >> >>> >> void register(String resource, HelixActorCallback<T> > > > >> callback); > > > >> >> >>> >> } > > > >> >> >>> >> public interface HelixActorCallback<T> { > > > >> >> >>> >> void onMessage(Partition partition, State state, T > > > message); > > > >> >> >>> >> } > > > >> >> >>> >> > > > >> >> >>> >> #send should likely support wildcards for partition number > and > > > >> >> state, > > > >> >> >>> or > > > >> >> >>> >> its method signature might need to be massaged a little bit > > > for > > > >> more > > > >> >> >>> >> flexibility. But that's the basic idea. > > > >> >> >>> >> > > > >> >> >>> >> Nothing is inferred about the format of the messages - the > > > only > > > >> >> >>> metadata > > > >> >> >>> >> we need to be able to interpret is (1) partition name and > (2) > > > >> state. > > > >> >> >>> The > > > >> >> >>> >> user provides a codec to encode / decode messages, so it's > > > nicer > > > >> to > > > >> >> >>> >> implementHelixActor#send and HelixActorCallback#onMessage. > > > >> >> >>> >> > > > >> >> >>> >> public interface HelixActorMessageCodec<T> { > > > >> >> >>> >> byte[] encode(T message); > > > >> >> >>> >> T decode(byte[] message); > > > >> >> >>> >> } > > > >> >> >>> >> > > > >> >> >>> >> Actors should support somewhere around 100k to 1M messages > per > > > >> >> second. > > > >> >> >>> >> The Netty framework is a potential implementation candidate, > > > but > > > >> >> >>> should be > > > >> >> >>> >> thoroughly evaluated w.r.t. performance. > > > >> >> >>> >> > > > >> >> >>> > > > > >> >> >>> > > > > >> >> >>> > > > >> >> >> > > > >> >> >> > > > >> >> > > > > >> >> > > > >> > > > > > > >