[ 
https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14571609#comment-14571609
 ] 

Nicolas PHUNG commented on SPARK-7122:
--------------------------------------

Platon, I have tried with a 10 seconds batch too but 
_KafkaUtils.createDirectStream_ still lags behind 30 minutes after almost 
keeping up with real time events ingested by Kafka when I'm doing a full 
reprocessing. I'll keep an eye on 
https://issues.apache.org/jira/browse/SPARK-7053 thanks. For now, I don't 
really have burst of data. I manage to run _KafkaUtils.createStream_ without 
any delay for about 2 weeks without interruptions.

_EsSpark.saveJsonToEs_ is an operation part of elasticsearch-hadoop project 
(under elasticsearch-spark artifactId). You can find the source here : 
https://github.com/elastic/elasticsearch-hadoop/blob/v2.1.0.Beta4/spark/core/main/scala/org/elasticsearch/spark/rdd/EsSpark.scala

Ok for the storage level of MEMORY_AND_DISK, but the weird thing is with 
_KafkaUtils.createDirectStream_ has a way better throughput than 
_KafkaUtils.createStream_. From my tests, it ingests data to Elasticsearch 
about 24 times faster (when I'm reprocessing all the data, I can see the 
throughput in elasticsearch indexing). The issue is happening once it gets near 
the recent events, there's no big burst of data coming to Kafka while I'm doing 
the reprocessing for now (maybe 5/6 events per seconds max). The delay is 
building up once it's about to catch up to real time events, when it's running 
the 400 tasks to get information from Kafka partitions even if it's mostly full 
of empty rdd.

I have tried this but it doesn't impact much it seems.
{code}
    analyticEventStream.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        EsSpark.saveJsonToEs(rdd, esIndex)
      }
    })
{code}

> KafkaUtils.createDirectStream - unreasonable processing time in absence of 
> load
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-7122
>                 URL: https://issues.apache.org/jira/browse/SPARK-7122
>             Project: Spark
>          Issue Type: Question
>          Components: Streaming
>    Affects Versions: 1.3.1
>         Environment: Spark Streaming 1.3.1, standalone mode running on just 1 
> box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40"
>            Reporter: Platon Potapov
>            Priority: Minor
>         Attachments: 10.second.window.fast.job.txt, 
> 5.second.window.slow.job.txt, SparkStreamingJob.scala
>
>
> attached is the complete source code of a test spark job. no external data 
> generators are run - just the presence of a kafka topic named "raw" suffices.
> the spark job is run with no load whatsoever. http://localhost:4040/streaming 
> is checked to obtain job processing duration.
> * in case the test contains the following transformation:
> {code}
>     // dummy transformation
>     val temperature = bytes.filter(_._1 == "abc")
>     val abc = temperature.window(Seconds(40), Seconds(5))
>     abc.print()
> {code}
> the median processing time is 3 seconds 80 ms
> * in case the test contains the following transformation:
> {code}
>     // dummy transformation
>     val temperature = bytes.filter(_._1 == "abc")
>     val abc = temperature.map(x => (1, x))
>     abc.print()
> {code}
> the median processing time is just 50 ms
> please explain why does the "window" transformation introduce such a growth 
> of job duration?
> note: the result is the same regardless of the number of kafka topic 
> partitions (I've tried 1 and 8)
> note2: the result is the same regardless of the window parameters (I've tried 
> (20, 2) and (40, 5))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to