[
https://issues.apache.org/jira/browse/SPARK-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15239122#comment-15239122
]
Sachin Aggarwal commented on SPARK-14597:
-----------------------------------------
I can see 2 approaches to providing the above additional metrics. I have
attached a PR for each approach. This is to review either approach and we can
pick the better one
1) add new events to listener bus:-
add more event case classes
case class StreamingListenerBatchGenerateStarted(time: Time) extends
StreamingListenerEvent
case class StreamingListenerBatchGenerateCompleted(time: Time) extends
StreamingListenerEvent
case class StreamingListenerCheckpointingStarted(time: Time) extends
StreamingListenerEvent
case class StreamingListenerCheckpointingCompleted(time: Time) extends
StreamingListenerEvent
and new functions to the listener interface for receiving information
def onBatchGenerateStarted(batchGenerateStarted:
StreamingListenerBatchGenerateStarted) { }
def onBatchGenerateCompleted(batchGenerateCompleted:
StreamingListenerBatchGenerateCompleted) { }
def onCheckpointingStarted(checkpointingStarted:
StreamingListenerCheckpointingStarted) { }
def onCheckpointingCompleted(checkpointingCompleted:
StreamingListenerCheckpointingCompleted) { }
2) add one parameter to JobSet and pass it to BatchInfo Class to track JobSet
CreationDelay for each batch. As jobSet CreationDelay is related to a batch,
this can be a part of BatchInfo. For checkpointing we can use listener approach
as checkpointing as checkpointing is triggered at checkpoint interval not
manadaterally at batch interval.
case class JobSet(
time: Time,
jobSetCreationDelay: Option[Long],
jobs: Seq[Job],
streamIdToInputInfo: Map[Int, StreamInputInfo] = Map.empty) {
case class BatchInfo(
batchTime: Time,
creationDelay: Option[Long],
streamIdToInputInfo: Map[Int, StreamInputInfo],
submissionTime: Long,
processingStartTime: Option[Long],
processingEndTime: Option[Long],
outputOperationInfos: Map[Int, OutputOperationInfo]
) {
}
> Streaming Listener timing metrics should include time spent in JobGenerator's
> graph.generateJobs
> ------------------------------------------------------------------------------------------------
>
> Key: SPARK-14597
> URL: https://issues.apache.org/jira/browse/SPARK-14597
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core, Streaming
> Affects Versions: 1.6.1, 2.0.0
> Reporter: Sachin Aggarwal
> Priority: Minor
>
> While looking to tune our streaming application, the piece of info we were
> looking for was actual processing time per batch. The
> StreamingListener.onBatchCompleted event provides a BatchInfo object that
> provided this information. It provides the following data
> - processingDelay
> - schedulingDelay
> - totalDelay
> - Submission Time
> The above are essentially calculated from the streaming JobScheduler
> clocking the processingStartTime and processingEndTime for each JobSet.
> Another metric available is submissionTime which is when a Jobset was put on
> the Streaming Scheduler's Queue.
>
> So we took processing delay as our actual processing time per batch. However
> to maintain a stable streaming application, we found that the our batch
> interval had to be a little less than DOUBLE of the processingDelay metric
> reported. (We are using a DirectKafkaInputStream). On digging further, we
> found that processingDelay is only clocking time spent in the ForEachRDD
> closure of the Streaming application and that JobGenerator's
> graph.generateJobs
> (https://github.com/apache/spark/blob/branch-1.6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala#L248)
> method takes a significant more amount of time.
> Thus a true reflection of processing time is
> a - Time spent in JobGenerator's Job Queue (JobGeneratorQueueDelay)
> b - Time spent in JobGenerator's graph.generateJobs (JobSetCreationDelay)
> c - Time spent in JobScheduler Queue for a Jobset (existing schedulingDelay
> metric)
> d - Time spent in Jobset's job run (existing processingDelay metric)
>
> Additionally a JobGeneratorQueue delay (#a) could be due to either
> graph.generateJobs taking longer than batchInterval or other JobGenerator
> events like checkpointing adding up time. Thus it would be beneficial to
> report time taken by the checkpointing Job as well
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]