Hard to say without seeing the code, but if you do multiple actions on an
Rdd without caching, the Rdd will be computed multiple times.

On Sep 10, 2016 2:43 AM, "Cheng Yi" <phillipchen...@gmail.com> wrote:

After some investigation, the problem i see is liked caused by a filter and
union of the dstream.
if i just do kafka-stream -- process -- output operator, then there is no
problem, one event will be fetched once.
if i do
kafka-stream -- process(1) - filter a stream A for later union --|
                                       |_ filter a stream B  -- process(2)
-----|_ A union B output process (3)
the event will be fetched 2 times, duplicate message start process at the
end of process(1), see following traces:

16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-testgid log-analysis-topic 2 1 *(fetch EVENT 1st time)*

16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator
192.168.2.6:9092 (id: 2147483647 rack: null) for group
spark-executor-testgid.

log of processing (1) for event 1

16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36).
1401 bytes result sent to driver

16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID
36) in 3494 ms on localhost (3/3)

16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks
have all completed, from pool

16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair
(*processing (1)*) at SparkAppDriver.java:136) finished in 3.506 s

16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages

16/09/10 00:11:03 INFO DAGScheduler: running: Set()

16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10,
ResultStage 11)

16/09/10 00:11:03 INFO DAGScheduler: failed: Set()

16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10
(UnionRDD[41] at union (*process (3)*) at SparkAppDriver.java:155), which
has no missing parents

16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from
ShuffleMapStage 10 (UnionRDD[41] at union at SparkAppDriver.java:155)

16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic,
partition 2 offsets 1 -> 2

16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-testgid log-analysis-topic 2 1 ( *(fetch the same EVENT 2nd
time)*)

16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
1473491460000 ms.0 from job set of time 1473491460000 ms

16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time
1473491460000 ms (execution: 10.874 s)* (EVENT 1st time process cost 10.874
s)*

16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
1473491465000 ms.0 from job set of time 1473491465000 ms

16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time
1473491465000 ms (execution: 0.066 s) *(EVENT 2nd time process cost 0.066)*

and the 2nd time processing of the event finished without really doing the
work.

Help is hugely appreciated.



--
View this message in context: http://apache-spark-user-list.
1001560.n3.nabble.com/spark-streaming-kafka-connector-
questions-tp27681p27687.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to