Blocks are replicated immediately, before the driver launches any jobs
using them.

On Thu, May 21, 2015 at 2:05 AM, Hemant Bhanawat <hemant9...@gmail.com>
wrote:

> Honestly, given the length of my email, I didn't expect a reply. :-)
> Thanks for reading and replying. However, I have a follow-up question:
>
> I don't think if I understand the block replication completely. Are the
> blocks replicated immediately after they are received by the receiver? Or
> are they kept on the receiver node only and are moved only on shuffle? Has
> the replication something to do with locality.wait?
>
> Thanks,
> Hemant
>
> On Thu, May 21, 2015 at 2:21 AM, Tathagata Das <t...@databricks.com>
> wrote:
>
>> Correcting the ones that are incorrect or incomplete. BUT this is good
>> list for things to remember about Spark Streaming.
>>
>>
>> On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat <hemant9...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have compiled a list (from online sources) of knobs/design
>>> considerations that need to be taken care of by applications running on
>>> spark streaming. Is my understanding correct?  Any other important design
>>> consideration that I should take care of?
>>>
>>>
>>>    - A DStream is associated with a single receiver. For attaining read
>>>    parallelism multiple receivers i.e. multiple DStreams need to be created.
>>>    - A receiver is run within an executor. It occupies one core. Ensure
>>>    that there are enough cores for processing after receiver slots are 
>>> booked
>>>    i.e. spark.cores.max should take the receiver slots into account.
>>>    - The receivers are allocated to executors in a round robin fashion.
>>>    - When data is received from a stream source, receiver creates
>>>    blocks of data.  A new block of data is generated every blockInterval
>>>    milliseconds. N blocks of data are created during the batchInterval 
>>> where N
>>>    = batchInterval/blockInterval.
>>>    - These blocks are distributed by the BlockManager of the current
>>>    executor to the block managers of other executors. After that, the 
>>> Network
>>>    Input Tracker running on the driver is informed about the block locations
>>>    for further processing.
>>>    - A RDD is created on the driver for the blocks created during the
>>>    batchInterval. The blocks generated during the batchInterval are 
>>> partitions
>>>    of the RDD. Each partition is a task in spark. blockInterval==
>>>    batchinterval would mean that a single partition is created and probably 
>>> it
>>>    is processed locally.
>>>
>>> The map tasks on the blocks are processed in the executors (one that
>> received the block, and another where the block was replicated) that has
>> the blocks irrespective of block interval, unless non-local scheduling
>> kicks in (as you observed next).
>>
>>>
>>>    - Having bigger blockinterval means bigger blocks. A high value of
>>>    spark.locality.wait increases the chance of processing a block on the 
>>> local
>>>    node. A balance needs to be found out between these two parameters to
>>>    ensure that the bigger blocks are processed locally.
>>>    - Instead of relying on batchInterval and blockInterval, you can
>>>    define the number of partitions by calling dstream.repartition(n). This
>>>    reshuffles the data in RDD randomly to create n number of partitions.
>>>
>>> Yes, for greater parallelism. Though comes at the cost of a shuffle.
>>
>>>
>>>    - An RDD's processing is scheduled by driver's jobscheduler as a
>>>    job. At a given point of time only one job is active. So, if one job is
>>>    executing the other jobs are queued.
>>>
>>>
>>>    - If you have two dstreams there will be two RDDs formed and there
>>>    will be two jobs created which will be scheduled one after the another.
>>>
>>>
>>>    - To avoid this, you can union two dstreams. This will ensure that a
>>>    single unionRDD is formed for the two RDDs of the dstreams. This unionRDD
>>>    is then considered as a single job. However the partitioning of the RDDs 
>>> is
>>>    not impacted.
>>>
>>> To further clarify, the jobs depend on the number of output operations
>> (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those
>> output operations.
>>
>> dstream1.union(dstream2).foreachRDD { rdd => rdd.count() }    // one
>> Spark job per batch
>>
>> dstream1.union(dstream2).foreachRDD { rdd => { rdd.count() ; rdd.count()
>> } }    // TWO Spark jobs per batch
>>
>> dstream1.foreachRDD { rdd => rdd.count } ; dstream2.foreachRDD { rdd =>
>> rdd.count }  // TWO Spark jobs per batch
>>
>>>
>>>
>>>
>>
>>>
>>>    -
>>>    - If the batch processing time is more than batchinterval then
>>>    obviously the receiver's memory will start filling up and will end up in
>>>    throwing exceptions (most probably BlockNotFoundException). Currently 
>>> there
>>>    is  no way to pause the receiver.
>>>
>>> You can limit the rate of receiver using SparkConf config
>> spark.streaming.receiver.maxRate
>>
>>>
>>>    -
>>>    - For being fully fault tolerant, spark streaming needs to enable
>>>    checkpointing. Checkpointing increases the batch processing time.
>>>
>>> Incomplete. There are two types of checkpointing - data and metadata.
>> Only data checkpointing, needed by only some operations, increase batch
>> processing time. Read -
>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
>> Furthemore, with checkpoint you can recover computation, but you may
>> loose some data (that was received but not processed before driver failed)
>> for some sources. Enabling write ahead logs and reliable source + receiver,
>> allow zero data loss. Read - WAL in
>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics
>>
>>>
>>>    - The frequency of metadata checkpoint cleaning can be controlled
>>>    using spark.cleaner.ttl. But, data checkpoint cleaning happens
>>>    automatically when the RDDs in the checkpoint are no more required.
>>>
>>>
>>> Incorrect. metadata checkpointing or (DStream checkpointing) is self
>> cleaning. What are you are probably talking about is cleaning of shuffle
>> and other data in the executors. That can be cleaned using
>> spark.cleaner.ttl, but it is a brute force hammer and can clean more stuff
>> than you intend. Its not recommended to use that. Rather Spark has
>> GC-triggered cleaning of all that, when RDD objects are GCed, their shuffle
>> data, cached data, etc are also cleaned in the executors. You can trigger
>> GC based cleaning by called System.gc() in the driver periodically.
>>
>>
>>>
>>> Thanks,
>>> Hemant
>>>
>>
>>
>

Reply via email to