[
https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14569147#comment-14569147
]
Nicolas PHUNG commented on SPARK-7122:
--------------------------------------
Mode 1 (old Kafka Streaming API)
{code}
object HotFCANextGen {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("MyApp")
sparkConf.setIfMissing("spark.master", "local[*]")
sparkConf.set("spark.eventLog.overwrite", "true")
sparkConf.set("spark.shuffle.consolidateFiles", "true")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val kafkaParams = Map[String, String]("metadata.broker.list" ->
kafkaBrokers, "group.id" -> groupId, "zookeeper.connect" -> zookeeperConnect,
"auto.offset.reset" -> "smallest", "schema.registry.url" ->
kafkaSchemaRegistryUrl)
val topicMap = kafkaTopics.split(",").map((_, 2)).toMap
val messages = KafkaUtils.createStream[Object, Object, KafkaAvroDecoder,
KafkaAvroDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK)
val analyticsEventsRaw = messages.map(_._2)
val analyticEventStream: DStream[AnalyticEventEnriched] =
analyticsEventsRaw.map(
avroMessage => {
val record: GenericRecord = avroMessage.asInstanceOf[GenericRecord]
val analyticEvent: AnalyticEventEnriched =
records.AnalyticEventEnrichedRecordReader.read(record)
analyticEvent
}
)
analyticEventStream.foreachRDD(rdd => {
rdd.take(11).foreach(
element =>
println(s"Saving to ES $element"))
EsSpark.saveJsonToEs(rdd, esIndex)
})
sys.ShutdownHookThread {
ssc.stop(true, true)
}
ssc.start()
ssc.awaitTermination()
}
}
{code}
Mode 2 (new Kafka Streaming API with checkpoint)
{code}
object HotFCANextGen {
def main(args: Array[String]) {
val ssc = StreamingContext.getOrCreate(checkpointDirectory, createContext)
sys.ShutdownHookThread {
ssc.stop(true, true)
}
ssc.start()
ssc.awaitTermination()
}
def createContext(): StreamingContext = {
val sparkConf = new SparkConf().setAppName("MyApp")
sparkConf.setIfMissing("spark.master", "local[*]")
sparkConf.set("spark.eventLog.overwrite", "true")
sparkConf.set("spark.shuffle.consolidateFiles", "true")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val kafkaParams = Map[String, String]("metadata.broker.list" ->
kafkaBrokers, "group.id" -> groupId, "zookeeper.connect" -> zookeeperConnect,
"auto.offset.reset" -> "smallest", "schema.registry.url" ->
kafkaSchemaRegistryUrl)
val topicsSet = kafkaTopics.split(",").toSet
val messages = KafkaUtils.createDirectStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)
val analyticsEventsRaw = messages.map(_._2)
val analyticEventStream: DStream[AnalyticEventEnriched] =
analyticsEventsRaw.map(
avroMessage => {
val record: GenericRecord = avroMessage.asInstanceOf[GenericRecord]
// Main avro record analytic event
val analyticEvent: AnalyticEventEnriched =
records.AnalyticEventEnrichedRecordReader.read(record)
analyticEvent
}
)
analyticEventStream.foreachRDD(rdd => {
rdd.take(11).foreach(
element =>
println(s"Saving to ES $element"))
EsSpark.saveJsonToEs(rdd, esIndex)
})
ssc.checkpoint(checkpointDirectory)
ssc
}
}
{code}
Strip to bare minimun. Basically, I'm reading Avro format message from Kafka
and enrich it after read and persist it to ElasticSearch (the enrich process
are the same for both case). There's a checkpoint too for the mode 2 and not in
mode 1
> KafkaUtils.createDirectStream - unreasonable processing time in absence of
> load
> -------------------------------------------------------------------------------
>
> Key: SPARK-7122
> URL: https://issues.apache.org/jira/browse/SPARK-7122
> Project: Spark
> Issue Type: Question
> Components: Streaming
> Affects Versions: 1.3.1
> Environment: Spark Streaming 1.3.1, standalone mode running on just 1
> box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40"
> Reporter: Platon Potapov
> Priority: Minor
> Attachments: 10.second.window.fast.job.txt,
> 5.second.window.slow.job.txt, SparkStreamingJob.scala
>
>
> attached is the complete source code of a test spark job. no external data
> generators are run - just the presence of a kafka topic named "raw" suffices.
> the spark job is run with no load whatsoever. http://localhost:4040/streaming
> is checked to obtain job processing duration.
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.window(Seconds(40), Seconds(5))
> abc.print()
> {code}
> the median processing time is 3 seconds 80 ms
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.map(x => (1, x))
> abc.print()
> {code}
> the median processing time is just 50 ms
> please explain why does the "window" transformation introduce such a growth
> of job duration?
> note: the result is the same regardless of the number of kafka topic
> partitions (I've tried 1 and 8)
> note2: the result is the same regardless of the window parameters (I've tried
> (20, 2) and (40, 5))
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]