[
https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14570974#comment-14570974
]
Platon Potapov commented on SPARK-7122:
---------------------------------------
Nicolas, in your scenario, it appears that "createDirectStream" lags behind due
to batches taking longer than 2 seconds to process from the get-go. Probably
due to the same bug that this very ticket is about - trivial jobs take 3
seconds to process. However, batch processing duration did not grow over time
in my case.
But if you use the legacy "createStream", please keep an eye out for a much
more devilish problem that I've also experienced:
https://issues.apache.org/jira/browse/SPARK-7053
In short, with the legacy Kafka receiver, my output was also *eventually*
lagging behind my input data, but only after the system has been subjected to a
load over some time (many hours in my case - it's all in the ticket).
> KafkaUtils.createDirectStream - unreasonable processing time in absence of
> load
> -------------------------------------------------------------------------------
>
> Key: SPARK-7122
> URL: https://issues.apache.org/jira/browse/SPARK-7122
> Project: Spark
> Issue Type: Question
> Components: Streaming
> Affects Versions: 1.3.1
> Environment: Spark Streaming 1.3.1, standalone mode running on just 1
> box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40"
> Reporter: Platon Potapov
> Priority: Minor
> Attachments: 10.second.window.fast.job.txt,
> 5.second.window.slow.job.txt, SparkStreamingJob.scala
>
>
> attached is the complete source code of a test spark job. no external data
> generators are run - just the presence of a kafka topic named "raw" suffices.
> the spark job is run with no load whatsoever. http://localhost:4040/streaming
> is checked to obtain job processing duration.
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.window(Seconds(40), Seconds(5))
> abc.print()
> {code}
> the median processing time is 3 seconds 80 ms
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.map(x => (1, x))
> abc.print()
> {code}
> the median processing time is just 50 ms
> please explain why does the "window" transformation introduce such a growth
> of job duration?
> note: the result is the same regardless of the number of kafka topic
> partitions (I've tried 1 and 8)
> note2: the result is the same regardless of the window parameters (I've tried
> (20, 2) and (40, 5))
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]