[ 
https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14571675#comment-14571675
 ] 

Cody Koeninger commented on SPARK-7122:
---------------------------------------

Try doing something very straightforward inside of foreachRDD (println, 
increment a counter, whatever), instead of the current code.

You're calling isEmpty, which implies a take(1)

Then saveJsonToEs is internally calling take(1) again at least once

Take is going to successively schedule jobs, quadrupling the number of 
partitions used every time.  It's possible that starts getting some 
pathological behavior on RDDs that are empty or nearly empty yet still have 400 
partitions.

I could pretty straightforwardly override the definition of isEmpty in KafkaRDD 
to just look at the offset ranges instead of doing any work. Take would be a 
little trickier but still doable.  But let's figure out if that's the issue 
first.



> KafkaUtils.createDirectStream - unreasonable processing time in absence of 
> load
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-7122
>                 URL: https://issues.apache.org/jira/browse/SPARK-7122
>             Project: Spark
>          Issue Type: Question
>          Components: Streaming
>    Affects Versions: 1.3.1
>         Environment: Spark Streaming 1.3.1, standalone mode running on just 1 
> box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40"
>            Reporter: Platon Potapov
>            Priority: Minor
>         Attachments: 10.second.window.fast.job.txt, 
> 5.second.window.slow.job.txt, SparkStreamingJob.scala
>
>
> attached is the complete source code of a test spark job. no external data 
> generators are run - just the presence of a kafka topic named "raw" suffices.
> the spark job is run with no load whatsoever. http://localhost:4040/streaming 
> is checked to obtain job processing duration.
> * in case the test contains the following transformation:
> {code}
>     // dummy transformation
>     val temperature = bytes.filter(_._1 == "abc")
>     val abc = temperature.window(Seconds(40), Seconds(5))
>     abc.print()
> {code}
> the median processing time is 3 seconds 80 ms
> * in case the test contains the following transformation:
> {code}
>     // dummy transformation
>     val temperature = bytes.filter(_._1 == "abc")
>     val abc = temperature.map(x => (1, x))
>     abc.print()
> {code}
> the median processing time is just 50 ms
> please explain why does the "window" transformation introduce such a growth 
> of job duration?
> note: the result is the same regardless of the number of kafka topic 
> partitions (I've tried 1 and 8)
> note2: the result is the same regardless of the window parameters (I've tried 
> (20, 2) and (40, 5))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to