Will add version / type to the message format, and separate out the message
type decoding from the HelixIPCService itself.

What do you think about composing the HelixIPCService with a
HelixIPCCodecRegistry or something? It would accept a message type and
return the appropriate codec, which could then be used by the
HelixIPCService to decode the message on receipt, and route to a typed
callback. So we would have a callback registered per message type.


On Wed, Jul 23, 2014 at 10:17 AM, kishore g <g.kish...@gmail.com> wrote:

> Thanks Greg, this looks great. I see how you have interfaces such that we
> can add other impls such as zeromq etc. Netty impl looks good as a starting
> point.
>
> Couple of things
>
> -- HelixIPC seems to templatized on T Message. It also contains
> start/stop/register methods. This means we will have to open one socket per
> Message type which may not be a good idea.
> -- It might be better to separate HelixIPC into HelixIPCService and
> HelixIPC. Service will have the start/stop and one can get a concrete impl
> of HelixIPC<MessageType> from the service.
> -- I am not sure where should callback be, at this layer I think we will
> have only one callback that we provide that simply dispatches (async) the
> message to appropriate user callback
> -- MessageFormat might need additional fields like
> ----- version of the message format, allows us to enhance the message
> format latter
> ----- message type field, helix might have some message types that the app
> users may not care about. Allows us to intercept control messages.
>
> thanks
> Kishore G
>
>
>
>
>
> On Mon, Jul 21, 2014 at 9:47 AM, Greg Brandt <brandt.g...@gmail.com>
> wrote:
>
> > We talked about this in a little bit greater detail, and it's seeming
> like
> > there are three basic components that we want:
> >
> > 1. Transport - physically moving messages
> > 2. Resolution - map cluster scope to set of machines
> > 3. Management - message state, errors, retries, etc.
> >
> > The transport interface should be generic enough to allow many
> > implementations (e.g. Netty, Kafka, etc.) It may be valuable to do
> multiple
> > implementations if one underlying system supports desired message
> delivery
> > guarantees better than another. For example, Kafka would be more amenable
> > to at-least-once delivery, whereas something like Netty, with much less
> > overhead than Kafka, would be better for at-most-once delivery.
> >
> > The goal here I think is to provide something that's most similar to Akka
> > Actors, with the following guarantees:
> >
> > 1. at-most-once delivery
> > 2. message ordering per sender-receiver pair
> >
> > A light-weight messaging primitive with those guarantees would allow
> > implementation of many interesting things like Kishore mentions. Netty
> > seems most suited to this task (it is the underlying transport for Akka
> as
> > well). The reason to avoid Akka I think is that it imposes too much of
> its
> > own philosophy on the user, and is really bloated. A Netty-based
> primitive
> > could be provided out-of-the-box, and the interfaces it implements should
> > be easy enough for users to implement on their own. It would be
> relatively
> > trivial to add a Kafka-based implementation out-of-the-box as well.
> >
> > (Have a bit of a prototype here:
> >
> https://github.com/brandtg/helix-actors/blob/master/helix-actors/src/main/java/org/apache/helix/actor/netty/NettyHelixActor.java
> > )
> >
> > The transport mechanism should be agnostic to anything going on in the
> > cluster: the resolution module is totally separate, and maps cluster
> scope
> > to sets of nodes. For example, one could send a message to the entire
> > cluster, specific partitions of a resource, or even from one cluster to
> > another (via composing resolution modules or something).
> >
> > Then there would be a management component, composed with the other two
> > components, which manages user callbacks associated with varying cluster
> > scopes, tracks messages based on ID in order to perform request /
> response
> > or feedback-loop kinds of things, manages errors / timeouts / retries,
> etc.
> >
> > -Greg
> >
> >
> >
> > On Sat, Jul 12, 2014 at 11:27 AM, kishore g <g.kish...@gmail.com> wrote:
> >
> >> Hi Vlad,
> >>
> >> Yes the idea is to have a pluggable architecture where Helix provides
> the
> >> messaging channel without serializing/deserializing the data. In the
> end,
> >> I
> >> envision having most of the building blocks needed to build a
> distributed
> >> system. One should be able to build a datastore/search/olap/pu
> sub/stream
> >> processing systems easily on top of Helix.
> >>
> >> I am curious to see what kind of tuning you guys did to handle 1.5 QPS.
> >> Our
> >> goal is to have 1M QPS between the two servers. In order to achieve this
> >> we
> >> might need pipelining/batching along with ordering guarantee's. This is
> >> non
> >> trivial and error prone, I see a huge value in providing this feature.
> >> Most
> >> systems have built some form replication schemes on their own. There are
> >> two parts in the replication, one is the high through put message
> exchange
> >> and the other is the scheme that is synchronous replication/async
> >> replication/chained replication/consensus etc. The first part is the
> >> common
> >> part that is needed across all the replication scheme.
> >>
> >> Of course getting this right is not easy and design is tricky but I feel
> >> its worth a try.
> >>
> >> This is the high level flow I had discussed with Greg.
> >>
> >> Each server can host P partitions and is listening at one port. Based on
> >> how many execution threads are needed, we can create C channels. Every
> >> partition maps to a specific channel. On each channel we can guarantee
> >> message ordering. We simply provide a callback to handle each message
> and
> >> pass the received bytes without deserializing. Its up to the callback to
> >> either process it synchronously or asynchronously.
> >>
> >> There are more details that its difficult to go over in the email. We
> >> should probably write up the design and then deep dive on the details.
> >>
> >> thanks,
> >> Kishore G
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Sat, Jul 12, 2014 at 8:08 AM, vlad...@gmail.com <vlad...@gmail.com>
> >> wrote:
> >>
> >> > I agree that having a messaging scheme that is not ZooKeeper-reliant
> >> would
> >> > help in building some synchronization protocols between clients and
> >> > replicas. I am thinking of cases such as writing inside Kafka to
> >> replicated
> >> > partitions our our own application,a replicated Key/Value store.
> >> >
> >> > For me, service discovery bridges the gap between an application
> wanting
> >> > to address a service (in Helix's case more particularly, a
> >> service/resource
> >> > name/partition tuple) and finding out a list of hostnames that can
> >> receive
> >> > that request. After that, whether the application wants to use Helix
> >> itself
> >> > or another messaging stack is an open problem, perhaps a clear
> >> separation
> >> > of the discovery/messaging layers would allow for quickly swapping
> >> > different pieces here.
> >> >
> >> > The interesting part (in my view) in providing fast messaging as part
> of
> >> > Helix is the ability to use the service topology information that
> Helix
> >> > hosts (number of replicas and their placement) together with the
> >> messaging
> >> > layer as sufficient components for realizing a consensus algorithm
> >> between
> >> > replicas, that is providing, on top of the regular service discovery
> >> > function of writing to one instance of a service, the function of
> >> writing
> >> > with consensus verification to all replicas of a service.
> >> >
> >> > In terms of capabilities, looking at Kafka and KV/Store as use cases,
> >> this
> >> > would mean running a whole protocol between the replicas of the
> service
> >> to
> >> > achieve a certain synchronization outcome (be it Paxos, 2 Phase Commit
> >> or
> >> > Chain Replication - Paxos would probably not be needed since Helix
> >> already
> >> > provides a resilient master for the partition, so there is a clear
> >> > hierarchy). Kafka suggests even more cases if we take the interaction
> >> > between client and service into account, namely specifying the call to
> >> > return after the master gets the data or after consensus between
> master
> >> and
> >> > replicas have been achieved, or sending data asynchronously (the
> >> > combination of the producer.type and request.required.acks parameters
> of
> >> > the producer).
> >> >
> >> > The only reason for which I would like to see a pluggable architecture
> >> is
> >> > that in high volume applications the messaging stack requires a lot of
> >> > tuning. As I said at the meet-up, in our company we deal with about
> 1.5M
> >> > QPS, so having serialization/deserialization overheads per message, or
> >> > event separate listening/executor threads can be resource intensive.
> >> > Whenever we can, we are tempted to use a stage-event drive
> architecture,
> >> > with few threads that handle large message pools. This also raises the
> >> > question of how to implement the upper-layer callbacks (that would
> >> perform
> >> > the synchronization-related actions) without spawning new threads.
> >> >
> >> > Regards,
> >> > Vlad
> >> >
> >> >
> >> > On Fri, Jul 11, 2014 at 11:40 PM, kishore g <g.kish...@gmail.com>
> >> wrote:
> >> >
> >> >> Good point Vlad, I was thinking of defining the right abstraction and
> >> >> possibly provide one implementation based. I think the synchronous
> and
> >> >> asynchronous messaging should be covered as part of this.
> >> >>
> >> >> Also I think Finagle and likes of it cater more towards the client
> >> server
> >> >> communication but what we lack today is a good solution for peer to
> >> peer
> >> >> transport. For example, if some one has to build a consensus layer
> they
> >> >> have build everything from ground up. Providing a base layer that
> >> exchange
> >> >> messages between peers on a per partition basis can be a great
> building
> >> >> block to build different replication schemes.
> >> >>
> >> >> Overall messaging can be used for two cases
> >> >> -- data and control.
> >> >>
> >> >>
> >> >> CONTROL
> >> >>
> >> >> This is needed for control messages that occur rarely, for these type
> >> of
> >> >> messages latency/throughput is not important but its important for it
> >> to be
> >> >> reliable.
> >> >>
> >> >>
> >> >> DATA
> >> >>
> >> >> This is mainly exchanging data between different roles,
> >> >> controller-participant, participant-participant,
> spectator-participant.
> >> >> These types of message exchanges occur quite frequently and having
> high
> >> >> throughput and low latency is a requirement.
> >> >>
> >> >> For example having the following api and guarantee would allow one to
> >> >> build synchronous replication, asynchronous, quorum etc.
> >> >>
> >> >> send(partition, message, ACK_MODE, callback) ACK_MODE can be like ACK
> >> >> from ALL, QUORUM, 1, NONE etc. callback is fired when one gets the
> >> response
> >> >> back from the receiver.
> >> >>
> >> >> This simple api can allow both synchronous and asynchronous mode of
> >> >> communication. We dont have to do the serialization/deserialization.
> >> The
> >> >> hard part here would be to guarantee ordering between the messages.
> One
> >> >> should be able to specify the message ordering requirement, FIFO on a
> >> per
> >> >> partition level or dont care about ordering. Having this makes it
> easy
> >> for
> >> >> one to implement replication schemes.
> >> >>
> >> >>  thanks,
> >> >> Kishore G
> >> >>
> >> >>
> >> >> On Fri, Jul 11, 2014 at 12:49 PM, Greg Brandt <brandt.g...@gmail.com
> >
> >> >> wrote:
> >> >>
> >> >>> Vlad,
> >> >>>
> >> >>> I'm not sure that the goal here is to cover all possible use cases.
> >> This
> >> >>> is
> >> >>> intended as basically a replacement for the current Helix cluster
> >> >>> messaging
> >> >>> service, which doesn't rely on ZooKeeper. I would argue that Helix
> is
> >> >>> already that general framework for discovering the service endpoint,
> >> and
> >> >>> that this is just one implementation of an underlying messaging
> stack
> >> >>> (for
> >> >>> which there is currently only the ZooKeeper-based implementation).
> >> >>>
> >> >>> Moreover, keeping it too general (i.e. providing the APIs outlined
> in
> >> the
> >> >>> JIRA, but no implementation) puts much greater onus on the user. It
> >> would
> >> >>> be nice for someone just starting out with Helix to have a pretty
> good
> >> >>> messaging service readily available, as well as the APIs to
> implement
> >> >>> more
> >> >>> specific solutions if he or she so chooses.
> >> >>>
> >> >>> -Greg
> >> >>>
> >> >>>
> >> >>> On Fri, Jul 11, 2014 at 11:07 AM, vlad...@gmail.com <
> >> vlad...@gmail.com>
> >> >>> wrote:
> >> >>>
> >> >>> > This sounds like service discovery for a messaging solution. At
> this
> >> >>> point
> >> >>> > there would be some overlap with message buses such as Finagle.
> >> Perhaps
> >> >>> > this should be a general framework for discovering the service
> >> endpoint
> >> >>> > that could leave the actual underlaying messaging stack open (be
> it
> >> >>> > Finagle, Netty-based or ZeroMQ, for example). My only fear is that
> >> if
> >> >>> the
> >> >>> > messaging framework is encapsulated completely into Helix, it
> would
> >> be
> >> >>> hard
> >> >>> > to compete with tuned messaging bus solutions and cover all
> possible
> >> >>> use
> >> >>> > cases (for example, in my company, we use a large number of sub
> >> cases
> >> >>> on
> >> >>> > the synchronous call to asynchronous call spectrum).
> >> >>> >
> >> >>> > Regards,
> >> >>> > Vlad
> >> >>> >
> >> >>> >
> >> >>> > On Fri, Jul 11, 2014 at 10:56 AM, Greg Brandt <
> >> brandt.g...@gmail.com>
> >> >>> > wrote:
> >> >>> >
> >> >>> >> (copied from HELIX-470)
> >> >>> >>
> >> >>> >> Helix is missing a high-performance way to exchange messages
> among
> >> >>> >> resource partitions, with a user-friendly API.
> >> >>> >>
> >> >>> >> Currently, the Helix messaging service relies on creating many
> >> nodes
> >> >>> in
> >> >>> >> ZooKeeper, which can lead to ZooKeeper outages if messages are
> sent
> >> >>> too
> >> >>> >> frequently.
> >> >>> >>
> >> >>> >> In order to avoid this, high-performance NIO-based HelixActors
> >> should
> >> >>> be
> >> >>> >> implemented (in rough accordance with the actor model).
> HelixActors
> >> >>> exchange
> >> >>> >> messages asynchronously without waiting for a response, and are
> >> >>> >> partition/state-addressable.
> >> >>> >>
> >> >>> >> The API would look something like this:
> >> >>> >>
> >> >>> >> public interface HelixActor<T> {
> >> >>> >>     void send(Partition partition, String state, T message);
> >> >>> >>     void register(String resource, HelixActorCallback<T>
> callback);
> >> >>> >> }
> >> >>> >> public interface HelixActorCallback<T> {
> >> >>> >>     void onMessage(Partition partition, State state, T message);
> >> >>> >> }
> >> >>> >>
> >> >>> >> #send should likely support wildcards for partition number and
> >> state,
> >> >>> or
> >> >>> >> its method signature might need to be massaged a little bit for
> >> more
> >> >>> >> flexibility. But that's the basic idea.
> >> >>> >>
> >> >>> >> Nothing is inferred about the format of the messages - the only
> >> >>> metadata
> >> >>> >> we need to be able to interpret is (1) partition name and (2)
> >> state.
> >> >>> The
> >> >>> >> user provides a codec to encode / decode messages, so it's nicer
> to
> >> >>> >> implementHelixActor#send and HelixActorCallback#onMessage.
> >> >>> >>
> >> >>> >> public interface HelixActorMessageCodec<T> {
> >> >>> >>     byte[] encode(T message);
> >> >>> >>     T decode(byte[] message);
> >> >>> >> }
> >> >>> >>
> >> >>> >> Actors should support somewhere around 100k to 1M messages per
> >> second.
> >> >>> >> The Netty framework is a potential implementation candidate, but
> >> >>> should be
> >> >>> >> thoroughly evaluated w.r.t. performance.
> >> >>> >>
> >> >>> >
> >> >>> >
> >> >>>
> >> >>
> >> >>
> >> >
> >>
> >
> >
>

Reply via email to