Responses inline. Sorry for the delay. Hope this helps!

TD


On Fri, Sep 20, 2013 at 12:41 PM, Gerard Maas <gerard.m...@gmail.com> wrote:

> Hi all,
>
> Today I was contributing my (limited) Scala knowledge to some local folks
> that are working on setting up a Spark Streaming cluster with Kafka. Bear
> with me, I'm fairly new to Spark.
>
> One of the issues they are facing is they they observe all kafka traffic
> going to only one node of the cluster. The reason may lie behind this note
> [0].
>

The way our input streams works is that there is only one receiver per
input stream. So if you have set up only one kafka input stream then all
the data will go to one node running the receiver. For parallel ingestion
of data, you need to create multiple kafka input streams and partition the
kafka topics across them. Then there will be multiple receivers. and

The line [0] points to is not exactly the reason. What that lines does it
runs a dummy job to make sure that all the worker nodes have connected to
master before starting the receivers.


> Further on, I was looking into KafkaInputDStream and I stumbled on the way
> the dataBlocks are being populated: There are n threads running n message
> handlers, each listening to a specific kafka topic stream. They all seem to
> share the same reference of the blockGenerator and contribute data using
> the BlockGenerator.+= method [1], which does not offer any
> synchronization.
>
Unless I missed some part, It looks like we might have a concurrency issue
> when multiple kafka message handlers add data to the underlying ArrayBuffer
> in the BlockGenerator.
>
> This may be a very valid point. We will have to look into this. The
current block generator was not exactly designed for parallel insertion and
so this is something. For the time being a simple solution would be to add
a "synchronized" in += function.

I actually have plans to make major changes to the block generation code
and I will keep in mind of this issue.


> I continued my journey through the system and ended at NetworkInputTracker
> [2]. It mixes two concurrency models: The Actor-based
> NetworkInputStreamActor and the 'raw' thread-based ReceiverExecutor.  Where
> the NetworkInputTracker safely shares a queue with the actor, at the
> expense of some locking.
>
> From a newbie perspective, it looks like using a single concurrency model
> (actors?) would provide a uniform programming paradigm and simplify some of
> the code. But it's evident that the author, with knowledge of both models
> had made specific choices for a reason.
>
>
The justification behind the design of the NetworkInputTracker is as
follows. The ReceiverExecutor thread is meant to start the receiver and
stay blocked until the receivers are shutdown. This is because the way
receivers are started is by making an RDD out of the receiver objects and
starting a Spark job to distribute and execute them. Hence the receivers
are run as long running tasks in the workers, and in case a receiver dies
(node dies), Spark scheduler's in built fault-tolerance model will rerun
the task (i.e, start the receiver) on another node. Hence the Spark job
executing the receiver continues to be alive and the thread that started
the Spark job will stay blocked. Also this thread does not need to
communicate with any other thread. Tasks run through the actor model must
be short tasks that does not block the actor's thread, hence this activity
is fundamentally unsuited to be implemented using actors. Hence we used a
raw thread to start the Spark job that runs the executors. All other actual
communication between the receiver and tracker is handled by the
NetworkInputTrackerActor and NetworkReceiverActor (running with the
receiver).

Also, note that there is a mix of three concurrency models in the
NetworkReceiver -
(i) NetworkReceiverActor communicate with the master about block names,
(ii) the receiving thread receives data from some source and pushes it to
the block generator,
(iii) block generator has its own timer periodically forms blocks and
another thread that pushes the blocks to Spark's block manager
(blockPushingThread)

Again each of these threads have different requirements.
In case (iii), the "blockPushingThread" in the block generator needs to do
tasks (pushing blocks) that may take a bit of time to be done. That makes
it slightly unsuitable for being implemented as actors, for the reasons
stated earlier. Also, the timer and block pushing thread are different as
the timer running in it own thread should be execute strictly periodically
and not be delayed by variable delays in the block pushing thread.
Case (ii) could have been implemented as an actor as it just inserts a
record on an arraybuffer (i.e.m very small task). However, with rates of
more than 100K records received per second, I was unsure what the overhead
of sending each record as a message through the actor library would be
like. For this reason, we use actors only in the control plane and not the
data plane.
 In case (i), since all control plane messaging is done by actors, this was
implemented as actors.

I probably went into more detail that you wanted to know. :) Apologies if
that is case.

That said, there may definitely be a better way of implementing this
without reducing performance significantly. I am totally open for
discussion in this matter.



> Care to comment about the rationale behind mixing these concurrency models?
> Any suggestions on what should stay and what might be candidate for
> refactoring?
>
> Thanks!
>
> -kr, Gerard.
>
> [0]
>
> https://github.com/maasg/spark/blob/master/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala#L161
> [1]
>
> https://github.com/maasg/spark/blob/master/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala#L119
> [2]
>
> https://github.com/maasg/spark/blob/master/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
>

Reply via email to