Responses inline. Sorry for the delay. Hope this helps!
TD On Fri, Sep 20, 2013 at 12:41 PM, Gerard Maas <gerard.m...@gmail.com> wrote: > Hi all, > > Today I was contributing my (limited) Scala knowledge to some local folks > that are working on setting up a Spark Streaming cluster with Kafka. Bear > with me, I'm fairly new to Spark. > > One of the issues they are facing is they they observe all kafka traffic > going to only one node of the cluster. The reason may lie behind this note > [0]. > The way our input streams works is that there is only one receiver per input stream. So if you have set up only one kafka input stream then all the data will go to one node running the receiver. For parallel ingestion of data, you need to create multiple kafka input streams and partition the kafka topics across them. Then there will be multiple receivers. and The line [0] points to is not exactly the reason. What that lines does it runs a dummy job to make sure that all the worker nodes have connected to master before starting the receivers. > Further on, I was looking into KafkaInputDStream and I stumbled on the way > the dataBlocks are being populated: There are n threads running n message > handlers, each listening to a specific kafka topic stream. They all seem to > share the same reference of the blockGenerator and contribute data using > the BlockGenerator.+= method [1], which does not offer any > synchronization. > Unless I missed some part, It looks like we might have a concurrency issue > when multiple kafka message handlers add data to the underlying ArrayBuffer > in the BlockGenerator. > > This may be a very valid point. We will have to look into this. The current block generator was not exactly designed for parallel insertion and so this is something. For the time being a simple solution would be to add a "synchronized" in += function. I actually have plans to make major changes to the block generation code and I will keep in mind of this issue. > I continued my journey through the system and ended at NetworkInputTracker > [2]. It mixes two concurrency models: The Actor-based > NetworkInputStreamActor and the 'raw' thread-based ReceiverExecutor. Where > the NetworkInputTracker safely shares a queue with the actor, at the > expense of some locking. > > From a newbie perspective, it looks like using a single concurrency model > (actors?) would provide a uniform programming paradigm and simplify some of > the code. But it's evident that the author, with knowledge of both models > had made specific choices for a reason. > > The justification behind the design of the NetworkInputTracker is as follows. The ReceiverExecutor thread is meant to start the receiver and stay blocked until the receivers are shutdown. This is because the way receivers are started is by making an RDD out of the receiver objects and starting a Spark job to distribute and execute them. Hence the receivers are run as long running tasks in the workers, and in case a receiver dies (node dies), Spark scheduler's in built fault-tolerance model will rerun the task (i.e, start the receiver) on another node. Hence the Spark job executing the receiver continues to be alive and the thread that started the Spark job will stay blocked. Also this thread does not need to communicate with any other thread. Tasks run through the actor model must be short tasks that does not block the actor's thread, hence this activity is fundamentally unsuited to be implemented using actors. Hence we used a raw thread to start the Spark job that runs the executors. All other actual communication between the receiver and tracker is handled by the NetworkInputTrackerActor and NetworkReceiverActor (running with the receiver). Also, note that there is a mix of three concurrency models in the NetworkReceiver - (i) NetworkReceiverActor communicate with the master about block names, (ii) the receiving thread receives data from some source and pushes it to the block generator, (iii) block generator has its own timer periodically forms blocks and another thread that pushes the blocks to Spark's block manager (blockPushingThread) Again each of these threads have different requirements. In case (iii), the "blockPushingThread" in the block generator needs to do tasks (pushing blocks) that may take a bit of time to be done. That makes it slightly unsuitable for being implemented as actors, for the reasons stated earlier. Also, the timer and block pushing thread are different as the timer running in it own thread should be execute strictly periodically and not be delayed by variable delays in the block pushing thread. Case (ii) could have been implemented as an actor as it just inserts a record on an arraybuffer (i.e.m very small task). However, with rates of more than 100K records received per second, I was unsure what the overhead of sending each record as a message through the actor library would be like. For this reason, we use actors only in the control plane and not the data plane. In case (i), since all control plane messaging is done by actors, this was implemented as actors. I probably went into more detail that you wanted to know. :) Apologies if that is case. That said, there may definitely be a better way of implementing this without reducing performance significantly. I am totally open for discussion in this matter. > Care to comment about the rationale behind mixing these concurrency models? > Any suggestions on what should stay and what might be candidate for > refactoring? > > Thanks! > > -kr, Gerard. > > [0] > > https://github.com/maasg/spark/blob/master/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala#L161 > [1] > > https://github.com/maasg/spark/blob/master/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala#L119 > [2] > > https://github.com/maasg/spark/blob/master/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala >