Thanks, That is what I am missing. I have added cache before action, and that 2nd processing is avoided.
2016-09-10 5:10 GMT-07:00 Cody Koeninger <c...@koeninger.org>: > Hard to say without seeing the code, but if you do multiple actions on an > Rdd without caching, the Rdd will be computed multiple times. > > On Sep 10, 2016 2:43 AM, "Cheng Yi" <phillipchen...@gmail.com> wrote: > > After some investigation, the problem i see is liked caused by a filter and > union of the dstream. > if i just do kafka-stream -- process -- output operator, then there is no > problem, one event will be fetched once. > if i do > kafka-stream -- process(1) - filter a stream A for later union --| > |_ filter a stream B -- process(2) > -----|_ A union B output process (3) > the event will be fetched 2 times, duplicate message start process at the > end of process(1), see following traces: > > 16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for > spark-executor-testgid log-analysis-topic 2 1 *(fetch EVENT 1st time)* > > 16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator > 192.168.2.6:9092 (id: 2147483647 rack: null) for group > spark-executor-testgid. > > log of processing (1) for event 1 > > 16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36). > 1401 bytes result sent to driver > > 16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID > 36) in 3494 ms on localhost (3/3) > > 16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks > have all completed, from pool > > 16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair > (*processing (1)*) at SparkAppDriver.java:136) finished in 3.506 s > > 16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages > > 16/09/10 00:11:03 INFO DAGScheduler: running: Set() > > 16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10, > ResultStage 11) > > 16/09/10 00:11:03 INFO DAGScheduler: failed: Set() > > 16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10 > (UnionRDD[41] at union (*process (3)*) at SparkAppDriver.java:155), which > has no missing parents > > 16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from > ShuffleMapStage 10 (UnionRDD[41] at union at SparkAppDriver.java:155) > > 16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic, > partition 2 offsets 1 -> 2 > > 16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for > spark-executor-testgid log-analysis-topic 2 1 ( *(fetch the same EVENT 2nd > time)*) > > 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job > 1473491460000 ms.0 from job set of time 1473491460000 ms > > 16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time > 1473491460000 ms (execution: 10.874 s)* (EVENT 1st time process cost 10.874 > s)* > > 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job > 1473491465000 ms.0 from job set of time 1473491465000 ms > > 16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time > 1473491465000 ms (execution: 0.066 s) *(EVENT 2nd time process cost 0.066)* > > and the 2nd time processing of the event finished without really doing the > work. > > Help is hugely appreciated. > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/spark-streaming-kafka-connector-questi > ons-tp27681p27687.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > >