[ 
https://issues.apache.org/jira/browse/SPARK-7848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14557756#comment-14557756
 ] 

jay vyas commented on SPARK-7848:
---------------------------------

COPIED from the ASF Mailing list thread for convenience.....

{noformat}
Blocks are replicated immediately, before the driver launches any jobs using 
them.

On Thu, May 21, 2015 at 2:05 AM, Hemant Bhanawat <[email protected]> wrote:
Honestly, given the length of my email, I didn't expect a reply. :-) Thanks for 
reading and replying. However, I have a follow-up question: 

I don't think if I understand the block replication completely. Are the blocks 
replicated immediately after they are received by the receiver? Or are they 
kept on the receiver node only and are moved only on shuffle? Has the 
replication something to do with locality.wait? 

Thanks,
Hemant 

On Thu, May 21, 2015 at 2:21 AM, Tathagata Das <[email protected]> wrote:
Correcting the ones that are incorrect or incomplete. BUT this is good list for 
things to remember about Spark Streaming.


On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat <[email protected]> wrote:
Hi,

I have compiled a list (from online sources) of knobs/design considerations 
that need to be taken care of by applications running on spark streaming. Is my 
understanding correct?  Any other important design consideration that I should 
take care of?

A DStream is associated with a single receiver. For attaining read parallelism 
multiple receivers i.e. multiple DStreams need to be created.
A receiver is run within an executor. It occupies one core. Ensure that there 
are enough cores for processing after receiver slots are booked i.e. 
spark.cores.max should take the receiver slots into account.
The receivers are allocated to executors in a round robin fashion.
When data is received from a stream source, receiver creates blocks of data.  A 
new block of data is generated every blockInterval milliseconds. N blocks of 
data are created during the batchInterval where N = batchInterval/blockInterval.
These blocks are distributed by the BlockManager of the current executor to the 
block managers of other executors. After that, the Network Input Tracker 
running on the driver is informed about the block locations for further 
processing.
A RDD is created on the driver for the blocks created during the batchInterval. 
The blocks generated during the batchInterval are partitions of the RDD. Each 
partition is a task in spark. blockInterval== batchinterval would mean that a 
single partition is created and probably it is processed locally.
The map tasks on the blocks are processed in the executors (one that received 
the block, and another where the block was replicated) that has the blocks 
irrespective of block interval, unless non-local scheduling kicks in (as you 
observed next).
Having bigger blockinterval means bigger blocks. A high value of 
spark.locality.wait increases the chance of processing a block on the local 
node. A balance needs to be found out between these two parameters to ensure 
that the bigger blocks are processed locally.
Instead of relying on batchInterval and blockInterval, you can define the 
number of partitions by calling dstream.repartition(n). This reshuffles the 
data in RDD randomly to create n number of partitions.
Yes, for greater parallelism. Though comes at the cost of a shuffle.
An RDD's processing is scheduled by driver's jobscheduler as a job. At a given 
point of time only one job is active. So, if one job is executing the other 
jobs are queued.
If you have two dstreams there will be two RDDs formed and there will be two 
jobs created which will be scheduled one after the another.
To avoid this, you can union two dstreams. This will ensure that a single 
unionRDD is formed for the two RDDs of the dstreams. This unionRDD is then 
considered as a single job. However the partitioning of the RDDs is not 
impacted.
To further clarify, the jobs depend on the number of output operations (print, 
foreachRDD, saveAsXFiles) and the number of RDD actions in those output 
operations.

dstream1.union(dstream2).foreachRDD { rdd => rdd.count() }    // one Spark job 
per batch

dstream1.union(dstream2).foreachRDD { rdd => { rdd.count() ; rdd.count() } }    
// TWO Spark jobs per batch

dstream1.foreachRDD { rdd => rdd.count } ; dstream2.foreachRDD { rdd => 
rdd.count }  // TWO Spark jobs per batch
 
If the batch processing time is more than batchinterval then obviously the 
receiver's memory will start filling up and will end up in throwing exceptions 
(most probably BlockNotFoundException). Currently there is  no way to pause the 
receiver.
You can limit the rate of receiver using SparkConf config 
spark.streaming.receiver.maxRate
For being fully fault tolerant, spark streaming needs to enable checkpointing. 
Checkpointing increases the batch processing time.
Incomplete. There are two types of checkpointing - data and metadata. Only data 
checkpointing, needed by only some operations, increase batch processing time. 
Read - 
http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
Furthemore, with checkpoint you can recover computation, but you may loose some 
data (that was received but not processed before driver failed) for some 
sources. Enabling write ahead logs and reliable source + receiver, allow zero 
data loss. Read - WAL in 
http://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics
The frequency of metadata checkpoint cleaning can be controlled using 
spark.cleaner.ttl. But, data checkpoint cleaning happens automatically when the 
RDDs in the checkpoint are no more required.

Incorrect. metadata checkpointing or (DStream checkpointing) is self cleaning. 
What are you are probably talking about is cleaning of shuffle and other data 
in the executors. That can be cleaned using spark.cleaner.ttl, but it is a 
brute force hammer and can clean more stuff than you intend. Its not 
recommended to use that. Rather Spark has GC-triggered cleaning of all that, 
when RDD objects are GCed, their shuffle data, cached data, etc are also 
cleaned in the executors. You can trigger GC based cleaning by called 
System.gc() in the driver periodically.
 

Thanks, 
Hemant
{noformat}

> Update SparkStreaming docs to include "knobs" 
> ----------------------------------------------
>
>                 Key: SPARK-7848
>                 URL: https://issues.apache.org/jira/browse/SPARK-7848
>             Project: Spark
>          Issue Type: Documentation
>          Components: Streaming
>            Reporter: jay vyas
>
> A recent email on the maligning list detailed a bunch of great "knobs" to 
> remember for spark streaming. 
> Lets integrate this  into the docs where appropriate.
> I'll paste the raw text in a comment field below....



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to