Sorry it has taken me so long to respond, my responses are inline below.
Overall lots of really interesting ideas.

On 9/5/14, 8:55 PM, "Sean Zhong" <[email protected]> wrote:

>I'd like to share some thoughts on streaming:
>
> 1. Flow control.
>     Currently, storm is using max.spout.pending for flow control. It has
>two limitations, 1) unacked topology will easily goes out of memory. 2)
>For
>large max.spout.pending, there will be large latency, but the throughput
>will not increase.
>
>    I experimented a new flow control method with idea of sliding window.
>For each edge(upstream task with downstream task) there will be a sliding
>window control the message rate. The sliding is is controlled by ask and
>ack, ask and ack will be sent once for every 100 user messages. The
>experiment result can get a 2 million msg/s throughput (on 4 machine) with
>avg latency of 30ms without OOM, while the previous benchmark for storm
>0.92 can only get 0.7 million msg/s for acked topology, for unacked
>topology, storm 0.92 will OOM. With this flow control method,
>max.spout.pending will be used only for fault-torellance, it no longer
>controls the flow speed.

Flow control is a bit tricky simply because of the possibility of cyclical
topologies, but I do like the concept you have because it can provide the
flexibility to still send even in the event of messages loss.  I would
love to see a JIRA filed for this and some benchmarks.  I would also like
to see what happens in the case of a lossy network and how it handles
those situations.  

>2. Worker output transfer queue.
>   Currently there is a single worker transfer queue. Maybe it will be
>better to have a dedicated queue per target worker, instead of sharing a
>single queue? Because for single queue, there will be more thread
>contentions, besides, there will be larger latency. Because the message
>need to process the messge in queue one by one.

I¹m not so sure on this one.  My experimentation really does not show the
disruptor queue to be much of a bottleneck, especially in the
multi-threaded cases.  On my laptop I can do about 7 million messages per
second anywhere from 2 threads inputting all they way up to 32 threads
inputting.  I was able to get up to 25 million messages per second if I
reserved them in batches of 10, instead of 1 at a time.  If you have
evidence to the contrary I am happy to see it, my work has all been micro
benchmarks.

>3. Acking tunning
>  Currently in storm, each spout message will generate a root id. And
>every
>processing downstream will ack to that message id. The good point is that
>if one message is lost, we only need to replay that message. However, we
>have observed that the ack traffic is significant! I was thinking what is
>the real pattern of message loss. For example, when there is network
>issue,
>there usually will be multiple message get lost. If we are going to
>recover
>a batch of messages, why not leting the spout message sharing a same root
>id. For example, spout can generate a new root id every 10ms, all messages
>in 10ms will anchor to same root id. On each worker, we can do local merge
>of same root id before sending the message to acker. That should save a
>lot
>of traffic.

Isn¹t this very similar to how trident works?  Yes if messages are lost
from a network issue they are probably lost in batches, It sounds like a
very interesting possibility.

>4. message loss during load balancing.
>  when a task is migrated to a new worker. Will the messages targeted for
>this task get lost during the transition? For example, task t locates on
>worker A, then t migrate to worker B. In the topology, there will a time
>window that some task will still send the message to worker A, and there
>will be message loss. I am thinking, whether it is possible to do
>something
>on A, that A will no longer drop the message(with task not resloved
>locally), but relay that messages to B. So it will be smoother transition.

There is enough other areas for message loss when a worker moves that I
would not be too concerned about it here.  First any messages that are in
flight for the worker itself when it goes down are lost.  Any messages
that have been buffered in the IConnection object to go to the working
being moved, or for that matter netty, will be lost too.  I am not as
concerned about messages that are about to be sent.  But if you have a
clean solution I am happy to review it.

>5. At least once delivery.
>Without kafka, currently storm cannot do strict at least once delivery.
>Because once the spout worker dies, all message is lost. If we allow to
>use
>kafka, is it still necessary to cache a lot of message in spout memory?
>Why
>not just replaying from kafka?

The spout code in storm proper does not cache the messages in memory for
replay.  Some spout implementations might but storm itself does not.  It
will cache a mapping of tuple ID to the user defined ID that was handed to
storm when the tuple was emitted.  But that is it.  It uses that for the
callback to ack and fail in the spout.

>6. Trident and exactly once.
>I think current trident checkpointing implementation is not fast enough.
>1)
>It will checkpoint on every batch id. 2) The checkpointing is coordianated
>globally which may add latency. Whether we can make the checkpointing
>interval tunnable? for example, we may checkpoint once every 10 batches?
> About the global coordination of checkpointing, there was some idea to
>use
>kafka seq id as vector clock(Hailstorm: Distributed Stream Processing with
>Exactly Once Semantics). to avoid batching and coordianated checkpointing.

Sounds interesting.  I¹m not sure we want to tie the implementation of
trident too tightly to kafka, but if we can make it generic enough that
other systems could also tie into it that would be great.  I assume you
are referring to https://github.com/hailstorm-hs/hailstorm for this.  I
would need to read through exactly what they are doing, but it is very
intriguing.   

>7. thread model of tasks?
>Will make task runnable instead of thread help scaling up? Actor model use
>the Runnable model, it can host millions of actor in single JVM.

This is what S4 does.  Yes we could logically support millions of actors,
but what exactly are you trying to active that needs millions of bolt or
spout instances, that hardly ever run?  Can¹t you active the same thing by
wrapping these actors in bolt instances and having the proper groupings?

>
>8. Connect with spark.
>It will be very interesting to see how to connect to spark, co-operate
>with
>spark. That spark can directly manipulate the storm DAG, and read the data
>generated by storm.

I am curious what the use case is here that you would want?  Do you want
to replace the spark streaming back end with storm and let the two
interact with each other?  That seems like a very ambitions project that I
have trouble seeing how well it would catch on.

>Sean

Reply via email to