[ 
https://issues.apache.org/jira/browse/SPARK-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15239122#comment-15239122
 ] 

Sachin Aggarwal commented on SPARK-14597:
-----------------------------------------

I can see 2 approaches to providing the above additional metrics. I have 
attached a PR for each approach. This is to review either approach and we can 
pick the better one
   
1) add new events to listener bus:-
    add more event case classes 
         case class StreamingListenerBatchGenerateStarted(time: Time) extends 
StreamingListenerEvent
         case class StreamingListenerBatchGenerateCompleted(time: Time) extends 
StreamingListenerEvent
         case class StreamingListenerCheckpointingStarted(time: Time) extends 
StreamingListenerEvent
         case class StreamingListenerCheckpointingCompleted(time: Time) extends 
StreamingListenerEvent

and new functions to the listener interface for receiving information
         def onBatchGenerateStarted(batchGenerateStarted: 
StreamingListenerBatchGenerateStarted) { }
         def onBatchGenerateCompleted(batchGenerateCompleted: 
StreamingListenerBatchGenerateCompleted) { }
         def onCheckpointingStarted(checkpointingStarted: 
StreamingListenerCheckpointingStarted) { }
         def onCheckpointingCompleted(checkpointingCompleted: 
StreamingListenerCheckpointingCompleted) { }

2) add one parameter to JobSet and pass it to BatchInfo Class to track JobSet 
CreationDelay for each batch. As jobSet CreationDelay is related to a batch, 
this can be a part of BatchInfo. For checkpointing we can use listener approach 
as checkpointing  as checkpointing is triggered at checkpoint interval not 
manadaterally at batch interval.

case class JobSet(
   time: Time,
   jobSetCreationDelay: Option[Long],
   jobs: Seq[Job],
   streamIdToInputInfo: Map[Int, StreamInputInfo] = Map.empty) {

   case class BatchInfo(
       batchTime: Time,
       creationDelay: Option[Long],
       streamIdToInputInfo: Map[Int, StreamInputInfo],
       submissionTime: Long,
       processingStartTime: Option[Long],
       processingEndTime: Option[Long],
       outputOperationInfos: Map[Int, OutputOperationInfo]
     ) {
} 

> Streaming Listener timing metrics should include time spent in JobGenerator's 
> graph.generateJobs
> ------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-14597
>                 URL: https://issues.apache.org/jira/browse/SPARK-14597
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core, Streaming
>    Affects Versions: 1.6.1, 2.0.0
>            Reporter: Sachin Aggarwal
>            Priority: Minor
>
> While looking to tune our streaming application, the piece of info we were 
> looking for was actual processing time per batch. The 
> StreamingListener.onBatchCompleted event provides a BatchInfo object that 
> provided this information. It provides the following data
>  - processingDelay
>  - schedulingDelay
>  - totalDelay
>  - Submission Time
>  The above are essentially calculated from the streaming JobScheduler 
> clocking the processingStartTime and processingEndTime for each JobSet. 
> Another metric available is submissionTime which is when a Jobset was put on 
> the Streaming Scheduler's Queue. 
>  
> So we took processing delay as our actual processing time per batch. However 
> to maintain a stable streaming application, we found that the our batch 
> interval had to be a little less than DOUBLE of the processingDelay metric 
> reported. (We are using a DirectKafkaInputStream). On digging further, we 
> found that processingDelay is only clocking time spent in the ForEachRDD 
> closure of the Streaming application and that JobGenerator's 
> graph.generateJobs 
> (https://github.com/apache/spark/blob/branch-1.6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala#L248)
>  method takes a significant more amount of time.
>  Thus a true reflection of processing time is
>  a - Time spent in JobGenerator's Job Queue (JobGeneratorQueueDelay)
>  b - Time spent in JobGenerator's graph.generateJobs (JobSetCreationDelay)
>  c - Time spent in JobScheduler Queue for a Jobset (existing schedulingDelay 
> metric)
>  d - Time spent in Jobset's job run (existing processingDelay metric)
>  
>  Additionally a JobGeneratorQueue delay (#a) could be due to either 
> graph.generateJobs taking longer than batchInterval or other JobGenerator 
> events like checkpointing adding up time. Thus it would be beneficial to 
> report time taken by the checkpointing Job as well



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to