[
https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14571609#comment-14571609
]
Nicolas PHUNG commented on SPARK-7122:
--------------------------------------
Platon, I have tried with a 10 seconds batch too but
_KafkaUtils.createDirectStream_ still lags behind 30 minutes after almost
keeping up with real time events ingested by Kafka when I'm doing a full
reprocessing. I'll keep an eye on
https://issues.apache.org/jira/browse/SPARK-7053 thanks. For now, I don't
really have burst of data. I manage to run _KafkaUtils.createStream_ without
any delay for about 2 weeks without interruptions.
_EsSpark.saveJsonToEs_ is an operation part of elasticsearch-hadoop project
(under elasticsearch-spark artifactId). You can find the source here :
https://github.com/elastic/elasticsearch-hadoop/blob/v2.1.0.Beta4/spark/core/main/scala/org/elasticsearch/spark/rdd/EsSpark.scala
Ok for the storage level of MEMORY_AND_DISK, but the weird thing is with
_KafkaUtils.createDirectStream_ has a way better throughput than
_KafkaUtils.createStream_. From my tests, it ingests data to Elasticsearch
about 24 times faster (when I'm reprocessing all the data, I can see the
throughput in elasticsearch indexing). The issue is happening once it gets near
the recent events, there's no big burst of data coming to Kafka while I'm doing
the reprocessing for now (maybe 5/6 events per seconds max). The delay is
building up once it's about to catch up to real time events, when it's running
the 400 tasks to get information from Kafka partitions even if it's mostly full
of empty rdd.
I have tried this but it doesn't impact much it seems.
{code}
analyticEventStream.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
EsSpark.saveJsonToEs(rdd, esIndex)
}
})
{code}
> KafkaUtils.createDirectStream - unreasonable processing time in absence of
> load
> -------------------------------------------------------------------------------
>
> Key: SPARK-7122
> URL: https://issues.apache.org/jira/browse/SPARK-7122
> Project: Spark
> Issue Type: Question
> Components: Streaming
> Affects Versions: 1.3.1
> Environment: Spark Streaming 1.3.1, standalone mode running on just 1
> box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40"
> Reporter: Platon Potapov
> Priority: Minor
> Attachments: 10.second.window.fast.job.txt,
> 5.second.window.slow.job.txt, SparkStreamingJob.scala
>
>
> attached is the complete source code of a test spark job. no external data
> generators are run - just the presence of a kafka topic named "raw" suffices.
> the spark job is run with no load whatsoever. http://localhost:4040/streaming
> is checked to obtain job processing duration.
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.window(Seconds(40), Seconds(5))
> abc.print()
> {code}
> the median processing time is 3 seconds 80 ms
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.map(x => (1, x))
> abc.print()
> {code}
> the median processing time is just 50 ms
> please explain why does the "window" transformation introduce such a growth
> of job duration?
> note: the result is the same regardless of the number of kafka topic
> partitions (I've tried 1 and 8)
> note2: the result is the same regardless of the window parameters (I've tried
> (20, 2) and (40, 5))
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]