[ 
https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14567857#comment-14567857
 ] 

Nicolas PHUNG commented on SPARK-7122:
--------------------------------------

I have a similar issue regarding the performance between the Kafka Spark 
Streaming integration :

{code}
val messages = KafkaUtils.createStream[Object, Object, KafkaAvroDecoder, 
KafkaAvroDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK)
{code}
vs
{code}
val messages = KafkaUtils.createDirectStream[Object, Object, KafkaAvroDecoder, 
KafkaAvroDecoder](ssc, kafkaParams, topicsSet)
{code}

KafkaUtils.createStream has a slower throughput than 
KafkaUtils.createDirectStream when I'm reprocessing all my events in my Kafka 
topic (about 300k message for about 3-4kb per message). After a while, it keeps 
up with the almost real time event, KafkaUtils.createDirectStream get slower 
and slower and get almost 30 minutes processing behind. Whereas 
KafkaUtils.createStream manage to be almost "real time" (it means recent event 
get stored almost instantly without delay). For both, I have a simple context 
without windows defined like this : 

{code}
val ssc = new StreamingContext(sparkConf, Seconds(2))
{code}

I don't understand why KafkaUtils.createDirectStream got so far behind on doing 
the same processing. 

> KafkaUtils.createDirectStream - unreasonable processing time in absence of 
> load
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-7122
>                 URL: https://issues.apache.org/jira/browse/SPARK-7122
>             Project: Spark
>          Issue Type: Question
>          Components: Streaming
>    Affects Versions: 1.3.1
>         Environment: Spark Streaming 1.3.1, standalone mode running on just 1 
> box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40"
>            Reporter: Platon Potapov
>            Priority: Minor
>         Attachments: 10.second.window.fast.job.txt, 
> 5.second.window.slow.job.txt, SparkStreamingJob.scala
>
>
> attached is the complete source code of a test spark job. no external data 
> generators are run - just the presence of a kafka topic named "raw" suffices.
> the spark job is run with no load whatsoever. http://localhost:4040/streaming 
> is checked to obtain job processing duration.
> * in case the test contains the following transformation:
> {code}
>     // dummy transformation
>     val temperature = bytes.filter(_._1 == "abc")
>     val abc = temperature.window(Seconds(40), Seconds(5))
>     abc.print()
> {code}
> the median processing time is 3 seconds 80 ms
> * in case the test contains the following transformation:
> {code}
>     // dummy transformation
>     val temperature = bytes.filter(_._1 == "abc")
>     val abc = temperature.map(x => (1, x))
>     abc.print()
> {code}
> the median processing time is just 50 ms
> please explain why does the "window" transformation introduce such a growth 
> of job duration?
> note: the result is the same regardless of the number of kafka topic 
> partitions (I've tried 1 and 8)
> note2: the result is the same regardless of the window parameters (I've tried 
> (20, 2) and (40, 5))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to