why groupByKey still shuffle if SQL does "Distribute By" on same columns ?
Hi, I am trying something like this.. val sesDS: Dataset[XXX] = hiveContext.sql(select).as[XXX] The select statement is something like this : "select * from sometable DISTRIBUTE by col1, col2, col3" Then comes groupByKey... val gpbyDS = sesDS .groupByKey(x => (x.col1, x.col2, x.col3)) As my select is already Distribute the data based on columns which are same as what I used in groupByKey, Why does groupByKey still doing the shuffle ? Is this an issue or I am missing something ? Regards, Dibyendu
Re: question on Write Ahead Log (Spark Streaming )
Hi, You could also use this Receiver : https://github.com/dibbhatt/kafka-spark-consumer This is part of spark-packages also : https://spark-packages.org/package/dibbhatt/kafka-spark-consumer You do not need to enable WAL in this and still recover from Driver failure with no data loss. You can refer to https://github.com/dibbhatt/kafka-spark-consumer/blob/master/README.md for more details or can reach out to me. Regards, Dibyendu On Wed, Mar 8, 2017 at 8:58 AM, kant kodaliwrote: > Hi All, > > I am using a Receiver based approach. And I understand that spark > streaming API's will convert the received data from receiver into blocks > and these blocks that are in memory are also stored in WAL if one enables > it. my upstream source which is not Kafka can also replay by which I mean > if I don't send an ack to my upstream source it will resend it so I don't > have to write the received data to WAL however I still need to enable WAL > correct? because there are blocks that are in memory that needs to written > to WAL so they can be recovered later. > > Thanks, > kant >
Latest Release of Receiver based Kafka Consumer for Spark Streaming.
Hi , Released latest version of Receiver based Kafka Consumer for Spark Streaming . Available at Spark Packages : https://spark-packages.org/package/dibbhatt/ kafka-spark-consumer Also at github : https://github.com/dibbhatt/kafka-spark-consumer Some key features - Tuned for better performance - Support for Spark 2.x, Kafka 0.10 - Support for Consumer Lag Check ( ConsumerOffsetChecker/ Burrow etc) - WAL less recovery - Better tuned PID Controller having Auto Rate Adjustment with incoming traffic - Support for Custom Message Interceptors Please refer to https://github.com/dibbhatt/kafka-spark-consumer/ blob/master/README.md for more details Regards, Dibyendu
Re: Latest Release of Receiver based Kafka Consumer for Spark Streaming.
Hi, This package is not dependant on any spefic Spark release and can be used with 1.5 . Please refer to "How To" section here : https://spark-packages.org/package/dibbhatt/kafka-spark-consumer Also you will find more information in readme file how to use this package. Regards, Dibyendu On Thu, Aug 25, 2016 at 7:01 PM, <mdkhajaasm...@gmail.com> wrote: > Hi Dibyendu, > > Looks like it is available in 2.0, we are using older version of spark 1.5 > . Could you please let me know how to use this with older versions. > > Thanks, > Asmath > > Sent from my iPhone > > On Aug 25, 2016, at 6:33 AM, Dibyendu Bhattacharya < > dibyendu.bhattach...@gmail.com> wrote: > > Hi , > > Released latest version of Receiver based Kafka Consumer for Spark > Streaming. > > Receiver is compatible with Kafka versions 0.8.x, 0.9.x and 0.10.x and All > Spark Versions > > Available at Spark Packages : https://spark-packages.org/ > package/dibbhatt/kafka-spark-consumer > > Also at github : https://github.com/dibbhatt/kafka-spark-consumer > > Salient Features : > >- End to End No Data Loss without Write Ahead Log >- ZK Based offset management for both consumed and processed offset >- No dependency on WAL and Checkpoint >- In-built PID Controller for Rate Limiting and Backpressure management >- Custom Message Interceptor > > Please refer to https://github.com/dibbhatt/kafka-spark-consumer/ > blob/master/README.md for more details > > > Regards, > > Dibyendu > > >
Latest Release of Receiver based Kafka Consumer for Spark Streaming.
Hi , Released latest version of Receiver based Kafka Consumer for Spark Streaming. Receiver is compatible with Kafka versions 0.8.x, 0.9.x and 0.10.x and All Spark Versions Available at Spark Packages : https://spark-packages.org/package/dibbhatt/kafka-spark-consumer Also at github : https://github.com/dibbhatt/kafka-spark-consumer Salient Features : - End to End No Data Loss without Write Ahead Log - ZK Based offset management for both consumed and processed offset - No dependency on WAL and Checkpoint - In-built PID Controller for Rate Limiting and Backpressure management - Custom Message Interceptor Please refer to https://github.com/dibbhatt/kafka-spark-consumer/blob/master/README.md for more details Regards, Dibyendu
Re: Severe Spark Streaming performance degradation after upgrading to 1.6.1
You can get some good pointers in this JIRA https://issues.apache.org/jira/browse/SPARK-15796 Dibyendu On Thu, Jul 14, 2016 at 12:53 AM, Sunitawrote: > I am facing the same issue. Upgrading to Spark1.6 is causing hugh > performance > loss. Could you solve this issue? I am also attempting memory settings as > mentioned > http://spark.apache.org/docs/latest/configuration.html#memory-management > > But its not making a lot of difference. Appreciate your inputs on this > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Severe-Spark-Streaming-performance-degradation-after-upgrading-to-1-6-1-tp27056p27330.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Any Idea about this error : IllegalArgumentException: File segment length cannot be negative ?
In Spark Streaming job, I see a Batch failed with following error. Haven't seen anything like this earlier. This has happened during Shuffle for one Batch (haven't reoccurred after that).. Just curious to know what can cause this error. I am running Spark 1.5.1 Regards, Dibyendu Job aborted due to stage failure: Task 2801 in stage 9421.0 failed 4 times, most recent failure: Lost task 2801.3 in stage 9421.0: java.lang.IllegalArgumentException: requirement failed: File segment length cannot be negative (got -68321) at scala.Predef$.require(Predef.scala:233) at org.apache.spark.storage.FileSegment.(FileSegment.scala:28) at org.apache.spark.storage.DiskBlockObjectWriter.fileSegment(DiskBlockObjectWriter.scala:216) at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:684) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Re: [Spark 1.6] Spark Streaming - java.lang.AbstractMethodError
Right .. if you are using github version, just modify the ReceiverLauncher and add that . I will fix it for Spark 1.6 and release new version in spark-packages for spark 1.6 Dibyendu On Thu, Jan 7, 2016 at 4:14 PM, Ted Yu <yuzhih...@gmail.com> wrote: > I cloned g...@github.com:dibbhatt/kafka-spark-consumer.git a moment ago. > > In ./src/main/java/consumer/kafka/ReceiverLauncher.java , I see: >jsc.addStreamingListener(new StreamingListener() { > > There is no onOutputOperationStarted method implementation. > > Looks like it should be added for Spark 1.6.0 > > Cheers > > On Thu, Jan 7, 2016 at 2:39 AM, Dibyendu Bhattacharya < > dibyendu.bhattach...@gmail.com> wrote: > >> You are using low level spark kafka consumer . I am the author of the >> same. >> >> Are you using the spark-packages version ? if yes which one ? >> >> Regards, >> Dibyendu >> >> On Thu, Jan 7, 2016 at 4:07 PM, Jacek Laskowski <ja...@japila.pl> wrote: >> >>> Hi, >>> >>> Do you perhaps use custom StreamingListener? >>> `StreamingListenerBus.scala:47` calls >>> `StreamingListener.onOutputOperationStarted` that was added in >>> [SPARK-10900] [STREAMING] Add output operation events to >>> StreamingListener [1] >>> >>> The other guess could be that at runtime you still use Spark < 1.6. >>> >>> [1] https://issues.apache.org/jira/browse/SPARK-10900 >>> >>> Pozdrawiam, >>> Jacek >>> >>> Jacek Laskowski | https://medium.com/@jaceklaskowski/ >>> Mastering Apache Spark >>> ==> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/ >>> Follow me at https://twitter.com/jaceklaskowski >>> >>> >>> >>> On Thu, Jan 7, 2016 at 10:59 AM, Walid LEZZAR <walez...@gmail.com> >>> wrote: >>> > Hi, >>> > >>> > We have been using spark streaming for a little while now. >>> > >>> > Until now, we were running our spark streaming jobs in spark 1.5.1 and >>> it >>> > was working well. Yesterday, we upgraded to spark 1.6.0 without any >>> changes >>> > in the code. But our streaming jobs are not working any more. We are >>> getting >>> > an "AbstractMethodError". Please, find the stack trace at the end of >>> the >>> > mail. Can we have some hints on what this error means ? (we are using >>> spark >>> > to connect to kafka) >>> > >>> > The stack trace : >>> > 16/01/07 10:44:39 INFO ZkState: Starting curator service >>> > 16/01/07 10:44:39 INFO CuratorFrameworkImpl: Starting >>> > 16/01/07 10:44:39 INFO ZooKeeper: Initiating client connection, >>> > connectString=localhost:2181 sessionTimeout=12 >>> > watcher=org.apache.curator.ConnectionState@2e9fa23a >>> > 16/01/07 10:44:39 INFO ClientCnxn: Opening socket connection to server >>> > localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL >>> > (unknown error) >>> > 16/01/07 10:44:39 INFO ClientCnxn: Socket connection established to >>> > localhost/127.0.0.1:2181, initiating session >>> > 16/01/07 10:44:39 INFO ClientCnxn: Session establishment complete on >>> server >>> > localhost/127.0.0.1:2181, sessionid = 0x1521b6d262e0035, negotiated >>> timeout >>> > = 6 >>> > 16/01/07 10:44:39 INFO ConnectionStateManager: State change: CONNECTED >>> > 16/01/07 10:44:40 INFO PartitionManager: Read partition information >>> from: >>> > >>> /spark-kafka-consumer/StreamingArchiver/lbc.job.multiposting.input/partition_0 >>> > --> null >>> > 16/01/07 10:44:40 INFO JobScheduler: Added jobs for time 145215988 >>> ms >>> > 16/01/07 10:44:40 INFO JobScheduler: Starting job streaming job >>> > 145215988 ms.0 from job set of time 145215988 ms >>> > 16/01/07 10:44:40 ERROR Utils: uncaught error in thread >>> > StreamingListenerBus, stopping SparkContext >>> > >>> > ERROR Utils: uncaught error in thread StreamingListenerBus, stopping >>> > SparkContext >>> > java.lang.AbstractMethodError >>> > at >>> > >>> org.apache.spark.streaming.scheduler.StreamingListenerBus.onPostEvent(StreamingListenerBus.scala:47) >>> > at >>> > >>> org.apache.spark.streaming.scheduler.StreamingListenerBus.onPostEvent(StreamingListenerBus.scala:26) >&
Re: [Spark 1.6] Spark Streaming - java.lang.AbstractMethodError
Some discussion is there in https://github.com/dibbhatt/kafka-spark-consumer and some is mentioned in https://issues.apache.org/jira/browse/SPARK-11045 Let me know if those answer your question . In short, Direct Stream is good choice if you need exact once semantics and message ordering , but many use case does not need such requirement of exact-once and message ordering . If you use Direct Stream the RDD processing parallelism is limited to Kafka partition and you need to store offset details to external store as checkpoint location is not reliable if you modify driver code . Whereas in Receiver based mode , you need to enable WAL for no data loss . But Spark Receiver based consumer from KafkaUtils which uses Kafka High Level API has serious issues , and thus if at all you need to switch to receiver based mode , this low level consumer is a better choice. Performance wise I have not published any number yet , but from internal testing and benchmarking I did ( and validated by folks who uses this consumer ), it perform much better than any existing consumer in Spark . Regards, Dibyendu On Thu, Jan 7, 2016 at 4:28 PM, Jacek Laskowski <ja...@japila.pl> wrote: > On Thu, Jan 7, 2016 at 11:39 AM, Dibyendu Bhattacharya > <dibyendu.bhattach...@gmail.com> wrote: > > You are using low level spark kafka consumer . I am the author of the > same. > > If I may ask, what are the differences between this and the direct > version shipped with spark? I've just started toying with it, and > would appreciate some guidance. Thanks. > > Jacek >
Re: Need to maintain the consumer offset by myself when using spark streaming kafka direct approach?
In direct stream checkpoint location is not recoverable if you modify your driver code. So if you just rely on checkpoint to commit offset , you can possibly loose messages if you modify driver code and you select offset from "largest" offset. If you do not want to loose messages, you need to commit offset to external store in case of direct stream. On Tue, Dec 8, 2015 at 7:47 PM, PhuDuc Nguyenwrote: > Kafka Receiver-based approach: > This will maintain the consumer offsets in ZK for you. > > Kafka Direct approach: > You can use checkpointing and that will maintain consumer offsets for you. > You'll want to checkpoint to a highly available file system like HDFS or S3. > > http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing > > You don't have to maintain your own offsets if you don't want to. If the 2 > solutions above don't satisfy your requirements, then consider writing your > own; otherwise I would recommend using the supported features in Spark. > > HTH, > Duc > > > > On Tue, Dec 8, 2015 at 5:05 AM, Tao Li wrote: > >> I am using spark streaming kafka direct approach these days. I found that >> when I start the application, it always start consumer the latest offset. I >> hope that when application start, it consume from the offset last >> application consumes with the same kafka consumer group. It means I have to >> maintain the consumer offset by my self, for example record it on >> zookeeper, and reload the last offset from zookeeper when restarting the >> applicaiton? >> >> I see the following discussion: >> https://github.com/apache/spark/pull/4805 >> https://issues.apache.org/jira/browse/SPARK-6249 >> >> Is there any conclusion? Do we need to maintain the offset by myself? Or >> spark streaming will support a feature to simplify the offset maintain work? >> >> >> https://forums.databricks.com/questions/2936/need-to-maintain-the-consumer-offset-by-myself-whe.html >> > >
Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka
This consumer which I mentioned does not silently throw away data. If offset out of range it start for earliest offset and that is correct way of recovery from this error. Dibyendu On Dec 2, 2015 9:56 PM, "Cody Koeninger" <c...@koeninger.org> wrote: > Again, just to be clear, silently throwing away data because your system > isn't working right is not the same as "recover from any Kafka leader > changes and offset out of ranges issue". > > > > On Tue, Dec 1, 2015 at 11:27 PM, Dibyendu Bhattacharya < > dibyendu.bhattach...@gmail.com> wrote: > >> Hi, if you use Receiver based consumer which is available in >> spark-packages ( >> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) , this >> has all built in failure recovery and it can recover from any Kafka leader >> changes and offset out of ranges issue. >> >> Here is the package form github : >> https://github.com/dibbhatt/kafka-spark-consumer >> >> >> Dibyendu >> >> On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy < >> swethakasire...@gmail.com> wrote: >> >>> How to avoid those Errors with receiver based approach? Suppose we are >>> OK with at least once processing and use receiver based approach which uses >>> ZooKeeper but not query Kafka directly, would these errors(Couldn't >>> find leader offsets for >>> Set([test_stream,5])))be avoided? >>> >>> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >>>> KafkaRDD.scala , handleFetchErr >>>> >>>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy < >>>> swethakasire...@gmail.com> wrote: >>>> >>>>> Hi Cody, >>>>> >>>>> How to look at Option 2(see the following)? Which portion of the code >>>>> in Spark Kafka Direct to look at to handle this issue specific to our >>>>> requirements. >>>>> >>>>> >>>>> 2.Catch that exception and somehow force things to "reset" for that >>>>> partition And how would it handle the offsets already calculated in the >>>>> backlog (if there is one)? >>>>> >>>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger <c...@koeninger.org> >>>>> wrote: >>>>> >>>>>> If you're consistently getting offset out of range exceptions, it's >>>>>> probably because messages are getting deleted before you've processed >>>>>> them. >>>>>> >>>>>> The only real way to deal with this is give kafka more retention, >>>>>> consume faster, or both. >>>>>> >>>>>> If you're just looking for a quick "fix" for an infrequent issue, >>>>>> option 4 is probably easiest. I wouldn't do that automatically / >>>>>> silently, >>>>>> because you're losing data. >>>>>> >>>>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK <swethakasire...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> So, our Streaming Job fails with the following errors. If you see >>>>>>> the errors >>>>>>> below, they are all related to Kafka losing offsets and >>>>>>> OffsetOutOfRangeException. >>>>>>> >>>>>>> What are the options we have other than fixing Kafka? We would like >>>>>>> to do >>>>>>> something like the following. How can we achieve 1 and 2 with Spark >>>>>>> Kafka >>>>>>> Direct? >>>>>>> >>>>>>> 1.Need to see a way to skip some offsets if they are not available >>>>>>> after the >>>>>>> max retries are reached..in that case there might be data loss. >>>>>>> >>>>>>> 2.Catch that exception and somehow force things to "reset" for that >>>>>>> partition And how would it handle the offsets already calculated in >>>>>>> the >>>>>>> backlog (if there is one)? >>>>>>> >>>>>>> 3.Track the offsets separately, restart the job by providing the >>>>>>> offsets. >>>>>>> >>>>>>> 4.Or a straightforward approach would be to monit
Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka
Well, even if you do correct retention and increase speed, OffsetOutOfRange can still come depends on how your downstream processing is. And if that happen , there is No Other way to recover old messages . So best bet here from Streaming Job point of view is to start from earliest offset rather bring down the streaming job . In many cases goal for a streaming job is not to shut down and exit in case of any failure. I believe that is what differentiate a always running streaming job. Dibyendu On Thu, Dec 3, 2015 at 8:26 AM, Cody Koeninger <c...@koeninger.org> wrote: > No, silently restarting from the earliest offset in the case of offset out > of range exceptions during a streaming job is not the "correct way of > recovery". > > If you do that, your users will be losing data without knowing why. It's > more like a "way of ignoring the problem without actually addressing it". > > The only really correct way to deal with that situation is to recognize > why it's happening, and either increase your Kafka retention or increase > the speed at which you are consuming. > > On Wed, Dec 2, 2015 at 7:13 PM, Dibyendu Bhattacharya < > dibyendu.bhattach...@gmail.com> wrote: > >> This consumer which I mentioned does not silently throw away data. If >> offset out of range it start for earliest offset and that is correct way of >> recovery from this error. >> >> Dibyendu >> On Dec 2, 2015 9:56 PM, "Cody Koeninger" <c...@koeninger.org> wrote: >> >>> Again, just to be clear, silently throwing away data because your system >>> isn't working right is not the same as "recover from any Kafka leader >>> changes and offset out of ranges issue". >>> >>> >>> >>> On Tue, Dec 1, 2015 at 11:27 PM, Dibyendu Bhattacharya < >>> dibyendu.bhattach...@gmail.com> wrote: >>> >>>> Hi, if you use Receiver based consumer which is available in >>>> spark-packages ( >>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) , >>>> this has all built in failure recovery and it can recover from any Kafka >>>> leader changes and offset out of ranges issue. >>>> >>>> Here is the package form github : >>>> https://github.com/dibbhatt/kafka-spark-consumer >>>> >>>> >>>> Dibyendu >>>> >>>> On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy < >>>> swethakasire...@gmail.com> wrote: >>>> >>>>> How to avoid those Errors with receiver based approach? Suppose we are >>>>> OK with at least once processing and use receiver based approach which >>>>> uses >>>>> ZooKeeper but not query Kafka directly, would these errors(Couldn't >>>>> find leader offsets for >>>>> Set([test_stream,5])))be avoided? >>>>> >>>>> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger <c...@koeninger.org> >>>>> wrote: >>>>> >>>>>> KafkaRDD.scala , handleFetchErr >>>>>> >>>>>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy < >>>>>> swethakasire...@gmail.com> wrote: >>>>>> >>>>>>> Hi Cody, >>>>>>> >>>>>>> How to look at Option 2(see the following)? Which portion of the >>>>>>> code in Spark Kafka Direct to look at to handle this issue specific to >>>>>>> our >>>>>>> requirements. >>>>>>> >>>>>>> >>>>>>> 2.Catch that exception and somehow force things to "reset" for that >>>>>>> partition And how would it handle the offsets already calculated in >>>>>>> the >>>>>>> backlog (if there is one)? >>>>>>> >>>>>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger <c...@koeninger.org> >>>>>>> wrote: >>>>>>> >>>>>>>> If you're consistently getting offset out of range exceptions, it's >>>>>>>> probably because messages are getting deleted before you've processed >>>>>>>> them. >>>>>>>> >>>>>>>> The only real way to deal with this is give kafka more retention, >>>>>>>> consume faster, or both. >>>>>>>> >>>>>>>> If you're just looking for a quick "fix" for an infrequent issue, >>>
Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka
Hi, if you use Receiver based consumer which is available in spark-packages ( http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) , this has all built in failure recovery and it can recover from any Kafka leader changes and offset out of ranges issue. Here is the package form github : https://github.com/dibbhatt/kafka-spark-consumer Dibyendu On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddywrote: > How to avoid those Errors with receiver based approach? Suppose we are OK > with at least once processing and use receiver based approach which uses > ZooKeeper but not query Kafka directly, would these errors(Couldn't find > leader offsets for > Set([test_stream,5])))be avoided? > > On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger wrote: > >> KafkaRDD.scala , handleFetchErr >> >> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy < >> swethakasire...@gmail.com> wrote: >> >>> Hi Cody, >>> >>> How to look at Option 2(see the following)? Which portion of the code in >>> Spark Kafka Direct to look at to handle this issue specific to our >>> requirements. >>> >>> >>> 2.Catch that exception and somehow force things to "reset" for that >>> partition And how would it handle the offsets already calculated in the >>> backlog (if there is one)? >>> >>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger >>> wrote: >>> If you're consistently getting offset out of range exceptions, it's probably because messages are getting deleted before you've processed them. The only real way to deal with this is give kafka more retention, consume faster, or both. If you're just looking for a quick "fix" for an infrequent issue, option 4 is probably easiest. I wouldn't do that automatically / silently, because you're losing data. On Mon, Nov 30, 2015 at 6:22 PM, SRK wrote: > Hi, > > So, our Streaming Job fails with the following errors. If you see the > errors > below, they are all related to Kafka losing offsets and > OffsetOutOfRangeException. > > What are the options we have other than fixing Kafka? We would like to > do > something like the following. How can we achieve 1 and 2 with Spark > Kafka > Direct? > > 1.Need to see a way to skip some offsets if they are not available > after the > max retries are reached..in that case there might be data loss. > > 2.Catch that exception and somehow force things to "reset" for that > partition And how would it handle the offsets already calculated in the > backlog (if there is one)? > > 3.Track the offsets separately, restart the job by providing the > offsets. > > 4.Or a straightforward approach would be to monitor the log for this > error, > and if it occurs more than X times, kill the job, remove the checkpoint > directory, and restart. > > ERROR DirectKafkaInputDStream: > ArrayBuffer(kafka.common.UnknownException, > org.apache.spark.SparkException: Couldn't find leader offsets for > Set([test_stream,5])) > > > > java.lang.ClassNotFoundException: > kafka.common.NotLeaderForPartitionException > > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) > > > > java.util.concurrent.RejectedExecutionException: Task > > org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8 > rejected from java.util.concurrent.ThreadPoolExecutor@543258e0 > [Terminated, > pool size = 0, active threads = 0, queued tasks = 0, completed tasks = > 12112] > > > > org.apache.spark.SparkException: Job aborted due to stage failure: > Task 10 > in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in > stage > 52.0 (TID 255, 172.16.97.97): UnknownReason > > Exception in thread "streaming-job-executor-0" java.lang.Error: > java.lang.InterruptedException > > Caused by: java.lang.InterruptedException > > java.lang.ClassNotFoundException: > kafka.common.OffsetOutOfRangeException > > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) > > > > org.apache.spark.SparkException: Job aborted due to stage failure: > Task 7 in > stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage > 33.0 > (TID 283, 172.16.97.103): UnknownReason > > java.lang.ClassNotFoundException: > kafka.common.OffsetOutOfRangeException > > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) > > java.lang.ClassNotFoundException: > kafka.common.OffsetOutOfRangeException > > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) > > > > -- > View this message in context: >
Re: Need more tasks in KafkaDirectStream
If you do not need one to one semantics and does not want strict ordering guarantee , you can very well use the Receiver based approach, and this consumer from Spark-Packages ( https://github.com/dibbhatt/kafka-spark-consumer) can give much better alternatives in terms of performance and reliability for Receiver based approach. Regards, Dibyendu On Thu, Oct 29, 2015 at 11:57 AM, varun sharmawrote: > Right now, there is one to one correspondence between kafka partitions and > spark partitions. > I dont have a requirement of one to one semantics. > I need more tasks to be generated in the job so that it can be > parallelised and batch can be completed fast. In the previous Receiver > based approach number of tasks created were independent of kafka > partitions, I need something like that only. > Any config available if I dont need one to one semantics? > Is there any way I can repartition without incurring any additional cost. > > Thanks > *VARUN SHARMA* > >
Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
Hi, I have raised a JIRA ( https://issues.apache.org/jira/browse/SPARK-11045) to track the discussion but also mailing user group . This Kafka consumer is around for a while in spark-packages ( http://spark-packages.org/package/dibbhatt/kafka-spark-consumer ) and I see many started using it , I am now thinking of contributing back to Apache Spark core project so that it can get better support ,visibility and adoption. Few Point about this consumer *Why this is needed :* This Consumer is NOT the replacement for existing DirectStream API. DirectStream solves the problem around "Exactly Once" semantics and "Global Ordering" of messages . But to achieve this DirectStream comes with an overhead. The overhead of maintaining the offset externally , limited parallelism while processing the RDD ( as the RDD partition is same as Kafka Partition ), and higher latency while processing RDD ( as messages are fetched when RDD is processed) . There are many who does not want "Exact Once" and "Global Ordering" of messages, or ordering are managed in external store ( say HBase), and want more parallelism and lower latency in their Streaming channel . At this point Spark does not have a better fallback option available in terms of Receiver Based API. Present Receiver Based API use Kafka High Level API which is low performance and has serious issue. [For this reason Kafka is coming up with new High Level Consumer API in 0.9] The Consumer which I implemented is using the Kafka Low Level API which gives more performance. This consumer has built in fault tolerant features for all failures recovery. This Consumer extended the code from Storm Kafka Spout which is being around for some time and has matured over the years and has all built in Kafka fault tolerant capabilities. This same Kafka consumer for spark is being running in various production scenarios presently and already being adopted by many in the spark community. *Why Can't we fix existing Receiver based API in Spark* : This is not possible unless you move to Kafka Low Level API . Or let wait for Kafka 0.9 where they are re-writing the HighLevel Consumer API and built another kafka spark consumer for Kafka 0.9 customers . This approach seems to be not good in my opinion. The Kafka Low Level API which I used in my consumer ( and also DirectStream uses ) will not going to be deprecated in near future. So if Kafka Consumer for Spark is using Low Level API for Receiver based mode, that will make sure all Kafka Customers who are presently in 0.8.x or who will use 0.9 , benefited form this same API. *Concerns around Low Level API Complexity* Yes, implementing a reliable consumer using Kafka Low Level consumer API is complex. But same has been done for Strom -Kafka Spout and has been stable for quite some time. This consumer for Spark is battle tested in various production loads and gives much better performance than existing Kafka Consumers for Spark and has better fault tolerant approach than existing Receiver based mode. *Why can't this consumer continue to be in Spark-Package ?* This can be possible. But what I see , many customer who want to fallback to receiver based mode as they may not need "Exact Once" semantics or "Global Ordering" , seems to little tentative using a spark-package library for their critical streaming pipeline. And they are forced to use faulty and buggy Kafka High Level API based mode. This consumer being part of Spark project will give much higher adoption and support from community. *Some Major features around this consumer :* This consumer is controlling the rate limit by maintaining the constant Block size where as default rate limiting in other Spark consumers are done by number of messages. This is an issue when Kafka has messages of different sizes and there is no deterministic way to know the actual block sizes and memory utilization if rate control done by number of messages. This consumer has in-built PID controller which controls the Rate of consumption again by modifying the block size and consume only that much amount of messages needed from Kafka . In default Spark consumer , it fetches chunk of messages and then apply throttle to control the rate. Which can lead to excess I/O while consuming from Kafka. You can also refer to Readme file for more details : https://github.com/dibbhatt/kafka-spark-consumer/blob/master/README.md If you are using this consumer or going to use it, you can Vote for this Jira. Regards, Dibyendu
Re: Spark Streaming over YARN
How many partitions are there in your Kafka topic ? Regards, Dibyendu On Sun, Oct 4, 2015 at 8:19 PM, <nib...@free.fr> wrote: > Hello, > I am using https://github.com/dibbhatt/kafka-spark-consumer > I specify 4 receivers in the ReceiverLauncher , but in YARN console I can > see one node receiving the kafka flow. > (I use spark 1.3.1) > > Tks > Nicolas > > > ----- Mail original - > De: "Dibyendu Bhattacharya" <dibyendu.bhattach...@gmail.com> > À: nib...@free.fr > Cc: "Cody Koeninger" <c...@koeninger.org>, "user" <user@spark.apache.org> > Envoyé: Vendredi 2 Octobre 2015 18:21:35 > Objet: Re: Spark Streaming over YARN > > > If your Kafka topic has 4 partitions , and if you specify 4 Receivers, > messages from each partitions are received by a dedicated receiver. so your > receiving parallelism is defined by your number of partitions of your topic > . Every receiver task will be scheduled evenly among nodes in your cluster. > There was a JIRA fixed in spark 1.5 which does even distribution of > receivers. > > > > > > Now for RDD parallelism ( i.e parallelism while processing your RDD ) is > controlled by your Block Interval and Batch Interval. > > > If your block Interval is 200 Ms, there will be 5 blocks per second. If > your Batch Interval is 3 seconds, there will 15 blocks per batch. And every > Batch is one RDD , thus your RDD will be 15 partition , which will be > honored during processing the RDD .. > > > > > Regards, > Dibyendu > > > > > On Fri, Oct 2, 2015 at 9:40 PM, < nib...@free.fr > wrote: > > > Ok so if I set for example 4 receivers (number of nodes), how RDD will be > distributed over the nodes/core. > For example in my example I have 4 nodes (with 2 cores) > > Tks > Nicolas > > > - Mail original - > De: "Dibyendu Bhattacharya" < dibyendu.bhattach...@gmail.com > > À: nib...@free.fr > Cc: "Cody Koeninger" < c...@koeninger.org >, "user" < > user@spark.apache.org > > Envoyé: Vendredi 2 Octobre 2015 18:01:59 > > > Objet: Re: Spark Streaming over YARN > > > Hi, > > > If you need to use Receiver based approach , you can try this one : > https://github.com/dibbhatt/kafka-spark-consumer > > > This is also part of Spark packages : > http://spark-packages.org/package/dibbhatt/kafka-spark-consumer > > > You just need to specify the number of Receivers you want for desired > parallelism while receiving , and rest of the thing will be taken care by > ReceiverLauncher. > > > This Low level Receiver will give better parallelism both on receiving , > and on processing the RDD. > > > Default Receiver based API ( KafkaUtils.createStream) using Kafka High > level API and Kafka high Level API has serious issue to be used in > production . > > > > > Regards, > > Dibyendu > > > > > > > > > > > On Fri, Oct 2, 2015 at 9:22 PM, < nib...@free.fr > wrote: > > > From my understanding as soon as I use YARN I don't need to use > parrallelisme (at least for RDD treatment) > I don't want to use direct stream as I have to manage the offset > positionning (in order to be able to start from the last offset treated > after a spark job failure) > > > - Mail original - > De: "Cody Koeninger" < c...@koeninger.org > > À: "Nicolas Biau" < nib...@free.fr > > Cc: "user" < user@spark.apache.org > > Envoyé: Vendredi 2 Octobre 2015 17:43:41 > Objet: Re: Spark Streaming over YARN > > > > > If you're using the receiver based implementation, and want more > parallelism, you have to create multiple streams and union them together. > > > Or use the direct stream. > > > On Fri, Oct 2, 2015 at 10:40 AM, < nib...@free.fr > wrote: > > > Hello, > I have a job receiving data from kafka (4 partitions) and persisting data > inside MongoDB. > It works fine, but when I deploy it inside YARN cluster (4 nodes with 2 > cores) only on node is receiving all the kafka partitions and only one node > is processing my RDD treatment (foreach function) > How can I force YARN to use all the resources nodes and cores to process > the data (receiver & RDD treatment) > > Tks a lot > Nicolas > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > >
Re: Spark Streaming over YARN
Hi, If you need to use Receiver based approach , you can try this one : https://github.com/dibbhatt/kafka-spark-consumer This is also part of Spark packages : http://spark-packages.org/package/dibbhatt/kafka-spark-consumer You just need to specify the number of Receivers you want for desired parallelism while receiving , and rest of the thing will be taken care by ReceiverLauncher. This Low level Receiver will give better parallelism both on receiving , and on processing the RDD. Default Receiver based API ( KafkaUtils.createStream) using Kafka High level API and Kafka high Level API has serious issue to be used in production . Regards, Dibyendu On Fri, Oct 2, 2015 at 9:22 PM,wrote: > From my understanding as soon as I use YARN I don't need to use > parrallelisme (at least for RDD treatment) > I don't want to use direct stream as I have to manage the offset > positionning (in order to be able to start from the last offset treated > after a spark job failure) > > > - Mail original - > De: "Cody Koeninger" > À: "Nicolas Biau" > Cc: "user" > Envoyé: Vendredi 2 Octobre 2015 17:43:41 > Objet: Re: Spark Streaming over YARN > > > If you're using the receiver based implementation, and want more > parallelism, you have to create multiple streams and union them together. > > > Or use the direct stream. > > > On Fri, Oct 2, 2015 at 10:40 AM, < nib...@free.fr > wrote: > > > Hello, > I have a job receiving data from kafka (4 partitions) and persisting data > inside MongoDB. > It works fine, but when I deploy it inside YARN cluster (4 nodes with 2 > cores) only on node is receiving all the kafka partitions and only one node > is processing my RDD treatment (foreach function) > How can I force YARN to use all the resources nodes and cores to process > the data (receiver & RDD treatment) > > Tks a lot > Nicolas > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Spark Streaming over YARN
If your Kafka topic has 4 partitions , and if you specify 4 Receivers, messages from each partitions are received by a dedicated receiver. so your receiving parallelism is defined by your number of partitions of your topic . Every receiver task will be scheduled evenly among nodes in your cluster. There was a JIRA fixed in spark 1.5 which does even distribution of receivers. Now for RDD parallelism ( i.e parallelism while processing your RDD ) is controlled by your Block Interval and Batch Interval. If your block Interval is 200 Ms, there will be 5 blocks per second. If your Batch Interval is 3 seconds, there will 15 blocks per batch. And every Batch is one RDD , thus your RDD will be 15 partition , which will be honored during processing the RDD .. Regards, Dibyendu On Fri, Oct 2, 2015 at 9:40 PM, <nib...@free.fr> wrote: > Ok so if I set for example 4 receivers (number of nodes), how RDD will be > distributed over the nodes/core. > For example in my example I have 4 nodes (with 2 cores) > > Tks > Nicolas > > > - Mail original - > De: "Dibyendu Bhattacharya" <dibyendu.bhattach...@gmail.com> > À: nib...@free.fr > Cc: "Cody Koeninger" <c...@koeninger.org>, "user" <user@spark.apache.org> > Envoyé: Vendredi 2 Octobre 2015 18:01:59 > Objet: Re: Spark Streaming over YARN > > > Hi, > > > If you need to use Receiver based approach , you can try this one : > https://github.com/dibbhatt/kafka-spark-consumer > > > This is also part of Spark packages : > http://spark-packages.org/package/dibbhatt/kafka-spark-consumer > > > You just need to specify the number of Receivers you want for desired > parallelism while receiving , and rest of the thing will be taken care by > ReceiverLauncher. > > > This Low level Receiver will give better parallelism both on receiving , > and on processing the RDD. > > > Default Receiver based API ( KafkaUtils.createStream) using Kafka High > level API and Kafka high Level API has serious issue to be used in > production . > > > > > Regards, > > Dibyendu > > > > > > > > > > > On Fri, Oct 2, 2015 at 9:22 PM, < nib...@free.fr > wrote: > > > From my understanding as soon as I use YARN I don't need to use > parrallelisme (at least for RDD treatment) > I don't want to use direct stream as I have to manage the offset > positionning (in order to be able to start from the last offset treated > after a spark job failure) > > > - Mail original - > De: "Cody Koeninger" < c...@koeninger.org > > À: "Nicolas Biau" < nib...@free.fr > > Cc: "user" < user@spark.apache.org > > Envoyé: Vendredi 2 Octobre 2015 17:43:41 > Objet: Re: Spark Streaming over YARN > > > > > If you're using the receiver based implementation, and want more > parallelism, you have to create multiple streams and union them together. > > > Or use the direct stream. > > > On Fri, Oct 2, 2015 at 10:40 AM, < nib...@free.fr > wrote: > > > Hello, > I have a job receiving data from kafka (4 partitions) and persisting data > inside MongoDB. > It works fine, but when I deploy it inside YARN cluster (4 nodes with 2 > cores) only on node is receiving all the kafka partitions and only one node > is processing my RDD treatment (foreach function) > How can I force YARN to use all the resources nodes and cores to process > the data (receiver & RDD treatment) > > Tks a lot > Nicolas > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > >
Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error
Hi, Recently I was working on a PR to use Tachyon as OFF_HEAP store for Spark Streaming and make sure Spark Streaming can recover from Driver failure and recover the blocks form Tachyon. The The Motivation for this PR is : If Streaming application stores the blocks OFF_HEAP, it may not need any WAL like feature to recover from Driver failure. As long as the writing of blocks to Tachyon from Streaming receiver is durable, it should be recoverable from Tachyon directly on Driver failure. This can solve the issue of expensive WAL write and duplicating the blocks both in MEMORY and also WAL and also guarantee end to end No-Data-Loss channel using OFF_HEAP store. https://github.com/apache/spark/pull/8817 This PR still under review . But having done various fail over testing in my environment , I see this PR worked perfectly fine without any data loss . Let see what TD and other have to say on this PR . Below is the configuration I used to test this PR .. Spark : 1.6 from Master Tachyon : 0.7.1 SparkConfiguration Details : SparkConf conf = new SparkConf().setAppName("TestTachyon") .set("spark.streaming.unpersist", "true") .set("spark.local.dir", "/mnt1/spark/tincan") .set("tachyon.zookeeper.address","10.252.5.113:2182") .set("tachyon.usezookeeper","true") .set("spark.externalBlockStore.url", "tachyon-ft:// ip-10-252-5-113.asskickery.us:19998") .set("spark.externalBlockStore.baseDir", "/sparkstreaming") .set("spark.externalBlockStore.folderName","pearson") .set("spark.externalBlockStore.dirId", "subpub") .set("spark.externalBlockStore.keepExternalBlockStoreDirOnShutdown","true"); JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration( 1)); String checkpointDirectory = "hdfs://10.252.5.113:9000/user/hadoop/spark/wal "; jsc.checkpoint(checkpointDirectory); //I am using the My Receiver Based Consumer ( https://github.com/dibbhatt/kafka-spark-consumer) . But KafkaUtil.CreateStream will also work JavaDStream unionStreams = ReceiverLauncher.launch( jsc, props, numberOfReceivers, StorageLevel.OFF_HEAP()); Regards, Dibyendu On Sat, Sep 26, 2015 at 11:59 AM, N B <nb.nos...@gmail.com> wrote: > Hi Dibyendu, > > How does one go about configuring spark streaming to use tachyon as its > place for storing checkpoints? Also, can one do this with tachyon running > on a completely different node than where spark processes are running? > > Thanks > Nikunj > > > On Thu, May 21, 2015 at 8:35 PM, Dibyendu Bhattacharya < > dibyendu.bhattach...@gmail.com> wrote: > >> Hi Tathagata, >> >> Thanks for looking into this. Further investigating I found that the >> issue is with Tachyon does not support File Append. The streaming receiver >> which writes to WAL when failed, and again restarted, not able to append to >> same WAL file after restart. >> >> I raised this with Tachyon user group, and Haoyuan told that within 3 >> months time Tachyon file append will be ready. Will revisit this issue >> again then . >> >> Regards, >> Dibyendu >> >> >> On Fri, May 22, 2015 at 12:24 AM, Tathagata Das <t...@databricks.com> >> wrote: >> >>> Looks like somehow the file size reported by the FSInputDStream of >>> Tachyon's FileSystem interface, is returning zero. >>> >>> On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya < >>> dibyendu.bhattach...@gmail.com> wrote: >>> >>>> Just to follow up this thread further . >>>> >>>> I was doing some fault tolerant testing of Spark Streaming with Tachyon >>>> as OFF_HEAP block store. As I said in earlier email, I could able to solve >>>> the BlockNotFound exception when I used Hierarchical Storage of >>>> Tachyon , which is good. >>>> >>>> I continue doing some testing around storing the Spark Streaming WAL >>>> and CheckPoint files also in Tachyon . Here is few finding .. >>>> >>>> >>>> When I store the Spark Streaming Checkpoint location in Tachyon , the >>>> throughput is much higher . I tested the Driver and Receiver failure cases >>>> , and Spark Streaming is able to recover without any Data Loss on Driver >>>> failure. >>>> >>>> *But on Receiver failure , Spark Streaming looses data* as I see >>>> Exception while reading the WAL file from Tachyon "receivedData" location >>>> for the same Receiver id which just failed. >>>> >>>> If I change the Checkpoint locati
Re: Managing scheduling delay in Spark Streaming
Hi Michal, If you use https://github.com/dibbhatt/kafka-spark-consumer , it comes with int own built-in back pressure mechanism. By default this is disabled, you need to enable it to use this feature with this consumer. It does control the rate based on Scheduling Delay at runtime.. Regards, Dibyendu On Wed, Sep 16, 2015 at 12:32 PM, Akhil Daswrote: > I had a workaround for exactly the same scenario > http://apache-spark-developers-list.1001551.n3.nabble.com/SparkStreaming-Workaround-for-BlockNotFound-Exceptions-td12096.html > > Apart from that, if you are using this consumer > https://github.com/dibbhatt/kafka-spark-consumer it also has a built-in > rate limiting, Also in Spark 1.5.0 they have a rate limiting/back-pressure > (haven't tested it on production though). > > > > Thanks > Best Regards > > On Tue, Sep 15, 2015 at 11:56 PM, Michal Čizmazia > wrote: > >> Hi, >> >> I have a Reliable Custom Receiver storing messages into Spark. Is there >> way how to prevent my receiver from storing more messages into Spark when >> the Scheduling Delay reaches a certain threshold? >> >> Possible approaches: >> #1 Does Spark block on the Receiver.store(messages) call to prevent >> storing more messages and overflowing the system? >> #2 How to obtain the Scheduling Delay in the Custom Receiver, so that I >> can implement the feature. >> >> Thanks, >> >> Mike >> >> >
Re: Using KafkaDirectStream, stopGracefully and exceptions
Hi, This is being running in Production in many organization who has adopted this consumer as an alternative option. The Consumer will run with spark 1.3.1 . This is being running in Pearson for sometime in production. This is part of spark packages and you can see how to include it in your mvn or sbt . http://spark-packages.org/package/dibbhatt/kafka-spark-consumer As this consumer comes with in-built PID controller to control back-pressure which you can use even if you are using Spark 1.3.1 Regards, Dibyendu On Thu, Sep 10, 2015 at 5:48 PM, Krzysztof Zarzyckiwrote: > Thanks Akhil, seems like an interesting option to consider. > Do you know if the package is production-ready? Do you use it in > production? > > And do you know if it works for Spark 1.3.1 as well? README mentions that > package in spark-packages.org is built with Spark 1.4.1. > > > Anyway, it seems that core Spark Streaming does not support my case? Or > anyone can instruct me on how to do it? Let's say, that I'm even fine (but > not content about) with using KafkaCluster private class that is included > in Spark, for manual managing ZK offsets. Has someone done it before? Has > someone public code examples of manually managing ZK offsets? > > Thanks, > Krzysztof > > 2015-09-10 12:22 GMT+02:00 Akhil Das : > >> This consumer pretty much covers all those scenarios you listed >> github.com/dibbhatt/kafka-spark-consumer Give it a try. >> >> Thanks >> Best Regards >> >> On Thu, Sep 10, 2015 at 3:32 PM, Krzysztof Zarzycki > > wrote: >> >>> Hi there, >>> I have a problem with fulfilling all my needs when using Spark Streaming >>> on Kafka. Let me enumerate my requirements: >>> 1. I want to have at-least-once/exactly-once processing. >>> 2. I want to have my application fault & simple stop tolerant. The Kafka >>> offsets need to be tracked between restarts. >>> 3. I want to be able to upgrade code of my application without losing >>> Kafka offsets. >>> >>> Now what my requirements imply according to my knowledge: >>> 1. implies using new Kafka DirectStream. >>> 2. implies using checkpointing. kafka DirectStream will write offsets >>> to the checkpoint as well. >>> 3. implies that checkpoints can't be used between controlled restarts. >>> So I need to install shutdownHook with ssc.stop(stopGracefully=true) (here >>> is a description how: >>> https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully >>> ) >>> >>> Now my problems are: >>> 1. If I cannot make checkpoints between code upgrades, does it mean that >>> Spark does not help me at all with keeping my Kafka offsets? Does it mean, >>> that I have to implement my own storing to/initalization of offsets from >>> Zookeeper? >>> 2. When I set up shutdownHook and my any executor throws an exception, >>> it seems that application does not fail, but stuck in running state. Is >>> that because stopGracefully deadlocks on exceptions? How to overcome this >>> problem? Maybe I can avoid setting shutdownHook and there is other way to >>> stop gracefully your app? >>> >>> 3. If I somehow overcome 2., is it enough to just stop gracefully my app >>> to be able to upgrade code & not lose Kafka offsets? >>> >>> >>> Thank you a lot for your answers, >>> Krzysztof Zarzycki >>> >>> >>> >>> >> >
Re: Using KafkaDirectStream, stopGracefully and exceptions
Hi, Just to clarify one point which may not be clear to many. If someone decides to use Receiver based approach , the best options at this point is to use https://github.com/dibbhatt/kafka-spark-consumer. This will also work with WAL like any other receiver based consumer. The major issue with KafkaUtils.CreateStream is, it use Kafka High Level API which has serious issue with Consumer Re-balance where as dibbhatt/kafka-spark-consumer use Low Level Kafka Consumer API which does not have any such issue. I am not sure if there is any publicly available performance benchmark done with this one with the DirectStream, so can not comment on performance benefits of one over other , but whatever performance benchmark we have done, dibbhatt/kafka-spark-consumer stands out.. Regards, Dibyendu On Thu, Sep 10, 2015 at 8:08 PM, Cody Koeningerwrote: > You have to store offsets somewhere. > > If you're going to store them in checkpoints, then you have to deal with > the fact that checkpoints aren't recoverable on code change. Starting up > the new version helps because you don't start it from the same checkpoint > directory as the running one... it has your new code, and is storing to a > new checkpoint directory. If you started the new one from the latest > offsets, you can shut down the old one as soon as it's caught up. > > If you don't like the implications of storing offsets in checkpoints... > then sure, store them yourself. A real database would be better, but if > you really want to store them in zookeeper you can. In any case, just do > your offset saves in the same foreachPartition your other output operations > are occurring in, after they've successfully completed. > > If you don't care about performance benefits of the direct stream and > don't want exactly once semantics, sure use the old stream. > > Finally, hundreds of gigs just really isn't very much data. Unless what > you're doing is really resource intensive, it shouldn't take much time to > process it all, especially if you can dynamically size a cluster for the > rare occasion that something is screwed up and you need to reprocess. > > > On Thu, Sep 10, 2015 at 9:17 AM, Krzysztof Zarzycki > wrote: > >> Thanks guys for your answers. I put my answers in text, below. >> >> Cheers, >> Krzysztof Zarzycki >> >> 2015-09-10 15:39 GMT+02:00 Cody Koeninger : >> >>> The kafka direct stream meets those requirements. You don't need >>> checkpointing for exactly-once. Indeed, unless your output operations are >>> idempotent, you can't get exactly-once if you're relying on checkpointing. >>> Instead, you need to store the offsets atomically in the same transaction >>> as your results. >>> >> >> To focus discussion, let's assume my operations are idempotent & I'm >> interested in at-least-once thanks to that (which is idempotent >> exactly-once as named in your pres). Did you say, that I don't need >> checkpointing for that? How then direct stream API would store offsets >> between restarts? >> >> >>> See >>> https://github.com/koeninger/kafka-exactly-once >>> and the video / blog posts linked from it. >>> >>> >> I did that, thank you. What I want is to achieve "idempotent >> exactly-once" as named in your presentation. >> >> >>> The dibhatt consumer that Akhil linked is using zookeeper to store >>> offsets, so to the best of my knowledge, it cannot do exactly-once without >>> idempotent output operations. >>> >> True, and I totally accept it if what I get is at-least-once. >> >> >>> >>> >> Regarding the issues around code changes and checkpointing, the most >>> straightforward way to deal with this is to just start a new version of >>> your job before stopping the old one. If you care about delivery semantics >>> and are using checkpointing, your output operation must be idempotent >>> anyway, so having 2 versions of the code running at the same time for a >>> brief period should not be a problem. >>> >> >> How starting new version before stopping old one helps? Could you please >> explain a bit the mechanics of that? >> Anyway, it seems definitely cumbersome. Plus, I can imagine plenty of >> situations when it will be just inapropriate to run old one, when, let's >> say, we discovered a bug and don't want to run it anymore. >> >> >> So... To sum up it correctly, if I want at-least-once, with simple code >> upgrades, I need to: >> - store offsets in external storage (I would choose ZK for that). >> - read them on application restart and pass the >> TopicAndPartition->offset map to createDirectStream. >> - And I don't need to use checkpoints at all then. >> Could you confirm that? >> >> It's a question where should I actually commit the ZK offsets. The >> easiest would be to do it on the end of every batch. Do you think I can use >> org.apache.spark.streaming.scheduler.StreamingListener, method >> onBatchCompleted for that? I don't think so, because probably we don't have >>
Re: BlockNotFoundException when running spark word count on Tachyon
Sometime back I was playing with Spark and Tachyon and I also found this issue . The issue here is TachyonBlockManager put the blocks in WriteType.TRY_CACHE configuration . And because of this Blocks ate evicted from Tachyon Cache when Memory is full and when Spark try to find the block it throws BlockNotFoundException . To solve this I tried Hierarchical Storage on Tachyon ( http://tachyon -project.org/Hierarchy-Storage-on-Tachyon.html ) , and that seems to have worked and I did not see any any Spark Job failed due to BlockNotFoundException. below is my Hierarchical Storage settings which I used.. -Dtachyon.worker.hierarchystore.level.max=2 -Dtachyon.worker.hierarchystore.level0.alias=MEM -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE -Dtachyon.worker.hierarchystore.level1.alias=HDD -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB -Dtachyon.worker.allocate.strategy=MAX_FREE -Dtachyon.worker.evict.strategy=LRU Regards, Dibyendu On Wed, Aug 26, 2015 at 12:25 PM, Todd bit1...@163.com wrote: I am using tachyon in the spark program below,but I encounter a BlockNotFoundxception. Does someone know what's wrong and also is there guide on how to configure spark to work with Tackyon?Thanks! conf.set(spark.externalBlockStore.url, tachyon://10.18.19.33:19998 ) conf.set(spark.externalBlockStore.baseDir,/spark) val sc = new SparkContext(conf) import org.apache.spark.storage.StorageLevel val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6)) rdd.persist(StorageLevel.OFF_HEAP) val count = rdd.count() val sum = rdd.reduce(_ + _) println(sThe count: $count, The sum is: $sum) 15/08/26 14:52:03 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage 0.0 (TID 5, localhost): java.lang.RuntimeException: org.apache.spark.storage.BlockNotFoundException: Block rdd_0_5 not found at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:308) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at
Re: BlockNotFoundException when running spark word count on Tachyon
The URL seems to have changed .. here is the one .. http://tachyon-project.org/documentation/Tiered-Storage-on-Tachyon.html On Wed, Aug 26, 2015 at 12:32 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Sometime back I was playing with Spark and Tachyon and I also found this issue . The issue here is TachyonBlockManager put the blocks in WriteType.TRY_CACHE configuration . And because of this Blocks ate evicted from Tachyon Cache when Memory is full and when Spark try to find the block it throws BlockNotFoundException . To solve this I tried Hierarchical Storage on Tachyon ( http://tachyon -project.org/Hierarchy-Storage-on-Tachyon.html ) , and that seems to have worked and I did not see any any Spark Job failed due to BlockNotFoundException. below is my Hierarchical Storage settings which I used.. -Dtachyon.worker.hierarchystore.level.max=2 -Dtachyon.worker.hierarchystore.level0.alias=MEM -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE -Dtachyon.worker.hierarchystore.level1.alias=HDD -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB -Dtachyon.worker.allocate.strategy=MAX_FREE -Dtachyon.worker.evict.strategy=LRU Regards, Dibyendu On Wed, Aug 26, 2015 at 12:25 PM, Todd bit1...@163.com wrote: I am using tachyon in the spark program below,but I encounter a BlockNotFoundxception. Does someone know what's wrong and also is there guide on how to configure spark to work with Tackyon?Thanks! conf.set(spark.externalBlockStore.url, tachyon://10.18.19.33:19998 ) conf.set(spark.externalBlockStore.baseDir,/spark) val sc = new SparkContext(conf) import org.apache.spark.storage.StorageLevel val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6)) rdd.persist(StorageLevel.OFF_HEAP) val count = rdd.count() val sum = rdd.reduce(_ + _) println(sThe count: $count, The sum is: $sum) 15/08/26 14:52:03 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage 0.0 (TID 5, localhost): java.lang.RuntimeException: org.apache.spark.storage.BlockNotFoundException: Block rdd_0_5 not found at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:308) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333
Just Released V1.0.4 Low Level Receiver Based Kafka-Spark-Consumer in Spark Packages having built-in Back Pressure Controller
Dear All, Just now released the 1.0.4 version of Low Level Receiver based Kafka-Spark-Consumer in spark-packages.org . You can find the latest release here : http://spark-packages.org/package/dibbhatt/kafka-spark-consumer Here is github location : https://github.com/dibbhatt/kafka-spark-consumer This consumer is now have built in PID ( Proportional , Integral, Derivative ) Rate controller to control the Spark Back-Pressure . This consumer implemented the Rate Limiting logic not by controlling the number of messages per block ( as it is done in Spark's Out of Box Consumers), but by size of the blocks per batch. i.e. for any given batch, this consumer controls the Rate limit by controlling the size of the batches. As Spark memory is driven by block size rather the number of messages , I think rate limit by block size is more appropriate. e.g. Let assume Kafka contains messages of very small sizes ( say few hundred bytes ) to larger messages ( to few hundred KB ) for same topic. Now if we control the rate limit by number of messages, Block sizes may vary drastically based on what type of messages get pulled per block . Whereas , if I control my rate limiting by size of block, my block size remain constant across batches (even though number of messages differ across blocks ) and can help to tune my memory settings more correctly as I know how much exact memory my Block is going to consume. This Consumer has its own PID (Proportional, Integral, Derivative ) Controller built into the consumer and control the Spark Back Pressure by modifying the size of Block it can consume at run time. The PID Controller rate feedback mechanism is built using Zookeeper. Again the logic to control Back Pressure is not by controlling number of messages ( as it is done in Spark 1.5 , SPARK-7398) but altering size of the Block consumed per batch from Kafka. As the Back Pressure is built into the Consumer, this consumer can be used with any version of Spark if anyone want to have a back pressure controlling mechanism in their existing Spark / Kafka environment. Regards, Dibyendu
Re: spark streaming 1.3 kafka error
I think you also can give a try to this consumer : http://spark-packages.org/package/dibbhatt/kafka-spark-consumer in your environment. This has been running fine for topic with large number of Kafka partition ( 200 ) like yours without any issue.. no issue with connection as this consumer re-use kafka connection , and also can recover from any failures ( network loss , Kafka leader goes down, ZK down etc ..). Regards, Dibyendu On Sat, Aug 22, 2015 at 7:35 PM, Shushant Arora shushantaror...@gmail.com wrote: On trying the consumer without external connections or with low number of external conections its working fine - so doubt is how socket got closed - java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. On Sat, Aug 22, 2015 at 7:24 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you try some other consumer and see if the issue still exists? On Aug 22, 2015 12:47 AM, Shushant Arora shushantaror...@gmail.com wrote: Exception comes when client has so many connections to some another external server also. So I think Exception is coming because of client side issue only- server side there is no issue. Want to understand is executor(simple consumer) not making new connection to kafka broker at start of each task ? Or is it created once only and that is getting closed somehow ? On Sat, Aug 22, 2015 at 9:41 AM, Shushant Arora shushantaror...@gmail.com wrote: it comes at start of each tasks when there is new data inserted in kafka.( data inserted is very few) kafka topic has 300 partitions - data inserted is ~10 MB. Tasks gets failed and it retries which succeed and after certain no of fail tasks it kills the job. On Sat, Aug 22, 2015 at 2:08 AM, Akhil Das ak...@sigmoidanalytics.com wrote: That looks like you are choking your kafka machine. Do a top on the kafka machines and see the workload, it may happen that you are spending too much time on disk io etc. On Aug 21, 2015 7:32 AM, Cody Koeninger c...@koeninger.org wrote: Sounds like that's happening consistently, not an occasional network problem? Look at the Kafka broker logs Make sure you've configured the correct kafka broker hosts / ports (note that direct stream does not use zookeeper host / port). Make sure that host / port is reachable from your driver and worker nodes, ie telnet or netcat to it. It looks like your driver can reach it (since there's partition info in the logs), but that doesn't mean the worker can. Use lsof / netstat to see what's going on with those ports while the job is running, or tcpdump if you need to. If you can't figure out what's going on from a networking point of view, post a minimal reproducible code sample that demonstrates the issue, so it can be tested in a different environment. On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi Getting below error in spark streaming 1.3 while consuming from kafka using directkafka stream. Few of tasks are getting failed in each run. What is the reason /solution of this error? 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in stage 130.0 (TID 16332) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:376) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162) at
Re: Reliable Streaming Receiver
Hi, You can try This Kafka Consumer for Spark which is also part of Spark Packages . https://github.com/dibbhatt/kafka-spark-consumer Regards, Dibyendu On Thu, Aug 6, 2015 at 6:48 AM, Sourabh Chandak sourabh3...@gmail.com wrote: Thanks Tathagata. I tried that but BlockGenerator internally uses SystemClock which is again private. We are using DSE so stuck with Spark 1.2 hence can't use the receiver-less version. Is it possible to use the same code as a separate API with 1.2? Thanks, Sourabh On Wed, Aug 5, 2015 at 6:13 PM, Tathagata Das t...@databricks.com wrote: You could very easily strip out the BlockGenerator code from the Spark source code and use it directly in the same way the Reliable Kafka Receiver uses it. BTW, you should know that we will be deprecating the receiver based approach for the Direct Kafka approach. That is quite flexible, can give exactly-once guarantee without WAL, and is more robust and performant. Consider using it. On Wed, Aug 5, 2015 at 5:48 PM, Sourabh Chandak sourabh3...@gmail.com wrote: Hi, I am trying to replicate the Kafka Streaming Receiver for a custom version of Kafka and want to create a Reliable receiver. The current implementation uses BlockGenerator which is a private class inside Spark streaming hence I can't use that in my code. Can someone help me with some resources to tackle this issue? Thanks, Sourabh
Some BlockManager Doubts
Hi , Just would like to clarify few doubts I have how BlockManager behaves . This is mostly in regards to Spark Streaming Context . There are two possible cases Blocks may get dropped / not stored in memory Case 1. While writing the Block for MEMORY_ONLY settings , if Node's BlockManager does not have enough memory to unroll the block , Block wont be stored to memory and Receiver will throw error while writing the Block.. If StorageLevel is using Disk ( as in case MEMORY_AND_DISK) , blocks will be stored to Disk ONLY IF BlockManager not able to unroll to Memory... This is fine in the case while receiving the blocks , but this logic has a issue when old Blocks are chosen to be dropped from memory as Case 2 Case 2 : Now let say either for MEMORY_ONLY or MEMORY_AND_DISK settings , blocks are successfully stored to Memory in Case 1 . Now what would happen if memory limit goes beyond a certain threshold, BlockManager start dropping LRU blocks from memory which was successfully stored while receiving. Primary issue here what I see , while dropping the blocks in Case 2 , Spark does not check if storage level is using Disk (MEMORY_AND_DISK ) , and even with DISK storage levels blocks is drooped from memory without writing it to Disk. Or I believe the issue is at the first place that blocks are NOT written to Disk simultaneously in Case 1 , I understand this will impact throughput , but it design may throw BlockNotFound error if Blocks are chosen to be dropped even in case of StorageLevel is using Disk. Any thoughts ? Regards, Dibyendu
Re: spark streaming with kafka reset offset
Hi, There is another option to try for Receiver Based Low Level Kafka Consumer which is part of Spark-Packages ( http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . This can be used with WAL as well for end to end zero data loss. This is also Reliable Receiver and Commit offset to ZK. Given the number of Kafka Partitions you have ( 100) , using High Level Kafka API for Receiver based approach may leads to issues related Consumer Re-balancing which is a major issue of Kafka High Level API. Regards, Dibyendu On Sat, Jun 27, 2015 at 3:04 PM, Tathagata Das t...@databricks.com wrote: In the receiver based approach, If the receiver crashes for any reason (receiver crashed or executor crashed) the receiver should get restarted on another executor and should start reading data from the offset present in the zookeeper. There is some chance of data loss which can alleviated using Write Ahead Logs (see streaming programming guide for more details, or see my talk [Slides PDF http://www.slideshare.net/SparkSummit/recipes-for-running-spark-streaming-apploications-in-production-tathagata-daspptx , Video https://www.youtube.com/watch?v=d5UJonrruHklist=PL-x35fyliRwgfhffEpywn4q23ykotgQJ6index=4 ] from last Spark Summit 2015). But that approach can give duplicate records. The direct approach gives exactly-once guarantees, so you should try it out. TD On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger c...@koeninger.org wrote: Read the spark streaming guide ad the kafka integration guide for a better understanding of how the receiver based stream works. Capacity planning is specific to your environment and what the job is actually doing, youll need to determine it empirically. On Friday, June 26, 2015, Shushant Arora shushantaror...@gmail.com wrote: In 1.2 how to handle offset management after stream application starts in each job . I should commit offset after job completion manually? And what is recommended no of consumer threads. Say I have 300 partitions in kafka cluster . Load is ~ 1 million events per second.Each event is of ~500bytes. Having 5 receivers with 60 partitions each receiver is sufficient for spark streaming to consume ? On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger c...@koeninger.org wrote: The receiver-based kafka createStream in spark 1.2 uses zookeeper to store offsets. If you want finer-grained control over offsets, you can update the values in zookeeper yourself before starting the job. createDirectStream in spark 1.3 is still marked as experimental, and subject to change. That being said, it works better for me in production than the receiver based api. On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora shushantaror...@gmail.com wrote: I am using spark streaming 1.2. If processing executors get crashed will receiver rest the offset back to last processed offset? If receiver itself got crashed is there a way to reset the offset without restarting streaming application other than smallest or largest. Is spark streaming 1.3 which uses low level consumer api, stabe? And which is recommended for handling data loss 1.2 or 1.3 .
Re: Kafka Spark Streaming: ERROR EndpointWriter: dropping message
Hi, Can you please little detail stack trace from your receiver logs and also the consumer settings you used ? I have never tested the consumer with Kafka 0.7.3 ..not sure if Kafka Version is the issue . Have you tried building the consumer using Kafka 0.7.3 ? Regards, Dibyendu On Wed, Jun 10, 2015 at 11:52 AM, karma243 ashut...@reducedata.com wrote: Thank you for responding @nsalian. 1. I am trying to replicate this https://github.com/dibbhatt/kafka-spark-consumer project on my local system. 2. Yes, kafka and brokers on the same host. 3. I am working with kafka 0.7.3 and spark 1.3.1. Kafka 0.7.3 does not has --describe command. Though I've worked on three cases (Kafka and Zookeeper were on my machine all the time): (i) Producer-Consumer on my machine. (ii) Producer on my machine and Consumer on different machine. (iii) Consumer on my machine and producer on different machine. All the three cases were working properly. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-Streaming-ERROR-EndpointWriter-dropping-message-tp23228p23240.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Kafka-Spark-Consumer] Spark-Streaming Job Fails due to Futures timed out
Hi Snehal Are you running the latest kafka consumer from github/spark-packages ? If not can you take the latest changes. This low level receiver will make attempt to keep trying if underlying BlockManager gives error. Are you see those retry cycle in log ? If yes then there is issue writing blocks to blockmanager and spark not able to recover from this failure but Receivet keep trying .. Which version of Spark you are using ? Dibyendu On Jun 9, 2015 5:14 AM, Snehal Nagmote nagmote.sne...@gmail.com wrote: All, I am using Kafka Spark Consumer https://github.com/dibbhatt/kafka-spark-consumer in spark streaming job . After spark streaming job runs for few hours , all executors exit and I still see status of application on SPARK UI as running Does anyone know cause of this exception and how to fix this ? WARN [sparkDriver-akka.actor.default-dispatcher-17:Logging$class@71] - Error reported by receiver for stream 7: Error While Store for Partition Partition{host=dal-kafka-broker01.bfd.walmart.com:9092, partition=27} - org.apache.spark.SparkException: Error sending message [message = UpdateBlockInfo(BlockManagerId(2, dfw-searcher.com, 33621),input-7-1433793457165,StorageLevel(false, true, false, false, 1),10492,0,0)] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221) at org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:384) at org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:360) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:812) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637) at org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:71) at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:161) at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushIterator(ReceiverSupervisorImpl.scala:136) at org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:152) at consumer.kafka.PartitionManager.next(PartitionManager.java:215) at consumer.kafka.KafkaConsumer.createStream(KafkaConsumer.java:75) at consumer.kafka.KafkaConsumer.run(KafkaConsumer.java:108) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) ... 14 more WARN [sparkDriver-akka.actor.default-dispatcher-30:Logging$class@92] - Error sending message [message = UpdateBlockInfo(BlockManagerId(driver, dfw-searcher.com, 57286),broadcast_10665_piece0,StorageLevel(false, false, false, false, 1),0,0,0)] in 2 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169) at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221) at org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:384) at org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:360) at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1104) at org.apache.spark.storage.BlockManager$$anonfun$removeBroadcast$2.apply(BlockManager.scala:1081) at
Re: [Kafka-Spark-Consumer] Spark-Streaming Job Fails due to Futures timed out
Seems to be related to this JIRA : https://issues.apache.org/jira/browse/SPARK-3612 ? On Tue, Jun 9, 2015 at 7:39 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Snehal Are you running the latest kafka consumer from github/spark-packages ? If not can you take the latest changes. This low level receiver will make attempt to keep trying if underlying BlockManager gives error. Are you see those retry cycle in log ? If yes then there is issue writing blocks to blockmanager and spark not able to recover from this failure but Receivet keep trying .. Which version of Spark you are using ? Dibyendu On Jun 9, 2015 5:14 AM, Snehal Nagmote nagmote.sne...@gmail.com wrote: All, I am using Kafka Spark Consumer https://github.com/dibbhatt/kafka-spark-consumer in spark streaming job . After spark streaming job runs for few hours , all executors exit and I still see status of application on SPARK UI as running Does anyone know cause of this exception and how to fix this ? WARN [sparkDriver-akka.actor.default-dispatcher-17:Logging$class@71] - Error reported by receiver for stream 7: Error While Store for Partition Partition{host=dal-kafka-broker01.bfd.walmart.com:9092, partition=27} - org.apache.spark.SparkException: Error sending message [message = UpdateBlockInfo(BlockManagerId(2, dfw-searcher.com, 33621),input-7-1433793457165,StorageLevel(false, true, false, false, 1),10492,0,0)] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221) at org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:384) at org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:360) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:812) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637) at org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:71) at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:161) at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushIterator(ReceiverSupervisorImpl.scala:136) at org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:152) at consumer.kafka.PartitionManager.next(PartitionManager.java:215) at consumer.kafka.KafkaConsumer.createStream(KafkaConsumer.java:75) at consumer.kafka.KafkaConsumer.run(KafkaConsumer.java:108) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) ... 14 more WARN [sparkDriver-akka.actor.default-dispatcher-30:Logging$class@92] - Error sending message [message = UpdateBlockInfo(BlockManagerId(driver, dfw-searcher.com, 57286),broadcast_10665_piece0,StorageLevel(false, false, false, false, 1),0,0,0)] in 2 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169) at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221) at org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:384) at org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:360) at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1104) at org.apache.spark.storage.BlockManager
Re: Spark Streaming and Drools
Hi, Sometime back I played with Distributed Rule processing by integrating Drool with HBase Co-Processors ..and invoke Rules on any incoming data .. https://github.com/dibbhatt/hbase-rule-engine You can get some idea how to use Drools rules if you see this RegionObserverCoprocessor .. https://github.com/dibbhatt/hbase-rule-engine/blob/master/src/main/java/hbase/rule/HBaseDroolObserver.java Idea is basically to create a stateless Ruleengine from the drl file and fire the rule on incoming data .. Even though the code is for invoking rules on HBase PUT object , but you can get an idea ..and modify it for Spark.. Dibyendu On Fri, May 22, 2015 at 3:49 PM, Evo Eftimov evo.efti...@isecc.com wrote: I am not aware of existing examples but you can always “ask” Google Basically from Spark Streaming perspective, Drools is a third-party Software Library, you would invoke it in the same way as any other third-party software library from the Tasks (maps, filters etc) within your DAG job *From:* Antonio Giambanco [mailto:antogia...@gmail.com] *Sent:* Friday, May 22, 2015 11:07 AM *To:* Evo Eftimov *Cc:* user@spark.apache.org *Subject:* Re: Spark Streaming and Drools Thanks a lot Evo, do you know where I can find some examples? Have a great one A G 2015-05-22 12:00 GMT+02:00 Evo Eftimov evo.efti...@isecc.com: You can deploy and invoke Drools as a Singleton on every Spark Worker Node / Executor / Worker JVM You can invoke it from e.g. map, filter etc and use the result from the Rule to make decision how to transform/filter an event/message *From:* Antonio Giambanco [mailto:antogia...@gmail.com] *Sent:* Friday, May 22, 2015 9:43 AM *To:* user@spark.apache.org *Subject:* Spark Streaming and Drools Hi All, I'm deploying and architecture that uses flume for sending log information in a sink. Spark streaming read from this sink (pull strategy) e process al this information, during this process I would like to make some event processing. . . for example: Log appender writes information about all transactions in my trading platforms, if a platform user sells more than buy during a week I need to receive an alert on an event dashboard. How can I realize it? Is it possible with drools? Thanks so much
Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error
Hi Tathagata, Thanks for looking into this. Further investigating I found that the issue is with Tachyon does not support File Append. The streaming receiver which writes to WAL when failed, and again restarted, not able to append to same WAL file after restart. I raised this with Tachyon user group, and Haoyuan told that within 3 months time Tachyon file append will be ready. Will revisit this issue again then . Regards, Dibyendu On Fri, May 22, 2015 at 12:24 AM, Tathagata Das t...@databricks.com wrote: Looks like somehow the file size reported by the FSInputDStream of Tachyon's FileSystem interface, is returning zero. On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Just to follow up this thread further . I was doing some fault tolerant testing of Spark Streaming with Tachyon as OFF_HEAP block store. As I said in earlier email, I could able to solve the BlockNotFound exception when I used Hierarchical Storage of Tachyon , which is good. I continue doing some testing around storing the Spark Streaming WAL and CheckPoint files also in Tachyon . Here is few finding .. When I store the Spark Streaming Checkpoint location in Tachyon , the throughput is much higher . I tested the Driver and Receiver failure cases , and Spark Streaming is able to recover without any Data Loss on Driver failure. *But on Receiver failure , Spark Streaming looses data* as I see Exception while reading the WAL file from Tachyon receivedData location for the same Receiver id which just failed. If I change the Checkpoint location back to HDFS , Spark Streaming can recover from both Driver and Receiver failure . Here is the Log details when Spark Streaming receiver failed ...I raised a JIRA for the same issue : https://issues.apache.org/jira/browse/SPARK-7525 INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2 (epoch 1)* INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to remove executor 2 from BlockManagerMaster. INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing block manager BlockManagerId(2, 10.252.5.54, 45789) INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2 successfully in removeExecutor INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered receiver for stream 2 from 10.252.5.62*:47255 WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could not read data from write ahead log record FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919 http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)* at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.IllegalArgumentException:* Seek position is past EOF: 645603894, fileSize = 0* at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239) at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37) at org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141) ... 15 more INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes) INFO
Re: Spark Streaming graceful shutdown in Spark 1.4
Thanks Tathagata for making this change.. Dibyendu On Thu, May 21, 2015 at 8:24 AM, Tathagata Das t...@databricks.com wrote: If you are talking about handling driver crash failures, then all bets are off anyways! Adding a shutdown hook in the hope of handling driver process failure, handles only a some cases (Ctrl-C), but does not handle cases like SIGKILL (does not run JVM shutdown hooks) or driver machine crash. So its not a good idea to rely on that. Nonetheless I have opened a PR to handle the shutdown of the StreamigntContext in the same way as SparkContext. https://github.com/apache/spark/pull/6307 On Tue, May 19, 2015 at 12:51 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Thenka Sean . you are right. If driver program is running then I can handle shutdown in main exit path . But if Driver machine is crashed (if you just stop the application, for example killing the driver process ), then Shutdownhook is the only option isn't it ? What I try to say is , just doing ssc.stop in sys.ShutdownHookThread or Runtime.getRuntime().addShutdownHook ( in java) wont work anymore. I need to use the Utils.addShutdownHook with a priority .. So just checking if Spark Streaming can make graceful shutdown as default shutdown mechanism. Dibyendu On Tue, May 19, 2015 at 1:03 PM, Sean Owen so...@cloudera.com wrote: I don't think you should rely on a shutdown hook. Ideally you try to stop it in the main exit path of your program, even in case of an exception. On Tue, May 19, 2015 at 7:59 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: You mean to say within Runtime.getRuntime().addShutdownHook I call ssc.stop(stopSparkContext = true, stopGracefully = true) ? This won't work anymore in 1.4. The SparkContext got stopped before Receiver processed all received blocks and I see below exception in logs. But if I add the Utils.addShutdownHook with the priority as I mentioned , then only graceful shutdown works . In that case shutdown-hook run in priority order.
Re: Spark Streaming graceful shutdown in Spark 1.4
By the way this happens when I stooped the Driver process ... On Tue, May 19, 2015 at 12:29 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: You mean to say within Runtime.getRuntime().addShutdownHook I call ssc.stop(stopSparkContext = true, stopGracefully = true) ? This won't work anymore in 1.4. The SparkContext got stopped before Receiver processed all received blocks and I see below exception in logs. But if I add the Utils.addShutdownHook with the priority as I mentioned , then only graceful shutdown works . In that case shutdown-hook run in priority order. *INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - Sent stop signal to all 3 receivers* ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered receiver for stream 0: Stopped by driver ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered receiver for stream 1: Stopped by driver ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered receiver for stream 2: Stopped by driver *INFO : org.apache.spark.SparkContext - Invoking stop() from shutdown hook* INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/streaming/batch/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/streaming/batch,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/streaming/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/streaming,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/metrics/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/static,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/executors/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/executors,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/environment/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/environment,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/storage/rdd,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/storage/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/storage,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/pool,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/stage,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/jobs/job,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/jobs/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/jobs,null} INFO : org.apache.spark.ui.SparkUI - Stopped Spark web UI at http://10.252.5.113:4040 INFO : org.apache.spark.scheduler.DAGScheduler - Stopping DAGScheduler INFO : org.apache.spark.scheduler.DAGScheduler - Job 4 failed: start at Consumer.java:122, took 10.398746 s *Exception in thread Thread-28 org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
Re: Spark Streaming graceful shutdown in Spark 1.4
Thenka Sean . you are right. If driver program is running then I can handle shutdown in main exit path . But if Driver machine is crashed (if you just stop the application, for example killing the driver process ), then Shutdownhook is the only option isn't it ? What I try to say is , just doing ssc.stop in sys.ShutdownHookThread or Runtime.getRuntime().addShutdownHook ( in java) wont work anymore. I need to use the Utils.addShutdownHook with a priority .. So just checking if Spark Streaming can make graceful shutdown as default shutdown mechanism. Dibyendu On Tue, May 19, 2015 at 1:03 PM, Sean Owen so...@cloudera.com wrote: I don't think you should rely on a shutdown hook. Ideally you try to stop it in the main exit path of your program, even in case of an exception. On Tue, May 19, 2015 at 7:59 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: You mean to say within Runtime.getRuntime().addShutdownHook I call ssc.stop(stopSparkContext = true, stopGracefully = true) ? This won't work anymore in 1.4. The SparkContext got stopped before Receiver processed all received blocks and I see below exception in logs. But if I add the Utils.addShutdownHook with the priority as I mentioned , then only graceful shutdown works . In that case shutdown-hook run in priority order.
Re: Spark Streaming graceful shutdown in Spark 1.4
(ReceiverTracker.scala:105) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:242) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241) at org.apache.spark.streaming.scheduler.JobGenerator.org $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Exception in thread main java.lang.IllegalStateException: Shutdown in progress On Tue, May 19, 2015 at 11:58 AM, Tathagata Das t...@databricks.com wrote: If you wanted to stop it gracefully, then why are you not calling ssc.stop(stopGracefully = true, stopSparkContext = true)? Then it doesnt matter whether the shutdown hook was called or not. TD On Mon, May 18, 2015 at 9:43 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi, Just figured out that if I want to perform graceful shutdown of Spark Streaming 1.4 ( from master ) , the Runtime.getRuntime().addShutdownHook no longer works . As in Spark 1.4 there is Utils.addShutdownHook defined for Spark Core, that gets anyway called , which leads to graceful shutdown from Spark streaming failed with error like Sparkcontext already closed issue. To solve this , I need to explicitly add Utils.addShutdownHook in my driver with higher priority ( say 150 ) than Spark's shutdown priority of 50 , and there I specified streamingcontext stop method with (false , true) parameter. Just curious to know , if this is how we need to handle shutdown hook going forward ? Can't we make the streaming shutdown default to gracefully shutdown ? Also the Java Api for adding shutdownhook in Utils looks very dirty with methods like this .. Utils.addShutdownHook(150, new Function0BoxedUnit() { @Override public BoxedUnit apply() { return null; } @Override public byte apply$mcB$sp() { return 0; } @Override public char apply$mcC$sp() { return 0; } @Override public double apply$mcD$sp() { return 0; } @Override public float apply$mcF$sp() { return 0; } @Override public int apply$mcI$sp() { // TODO Auto-generated method stub return 0; } @Override public long apply$mcJ$sp() { return 0; } @Override public short apply$mcS$sp() { return 0; } @Override public void apply$mcV$sp() { *jsc.stop(false, true);* } @Override public boolean apply$mcZ$sp() { // TODO Auto-generated method stub return false; } });
Re: spark streaming doubt
Just to add, there is a Receiver based Kafka consumer which uses Kafka Low Level Consumer API. http://spark-packages.org/package/dibbhatt/kafka-spark-consumer Regards, Dibyendu On Tue, May 19, 2015 at 9:00 PM, Akhil Das ak...@sigmoidanalytics.com wrote: On Tue, May 19, 2015 at 8:10 PM, Shushant Arora shushantaror...@gmail.com wrote: So for Kafka+spark streaming, Receiver based streaming used highlevel api and non receiver based streaming used low level api. 1.In high level receiver based streaming does it registers consumers at each job start(whenever a new job is launched by streaming application say at each second)? - Receiver based streaming will always have the receiver running parallel while your job is running, So by default for every 200ms (spark.streaming.blockInterval) the receiver will generate a block of data which is read from Kafka. 2.No of executors in highlevel receiver based jobs will always equal to no of partitions in topic ? - Not sure from where did you came up with this. For the non stream based one, i think the number of partitions in spark will be equal to the number of kafka partitions for the given topic. 3.Will data from a single topic be consumed by executors in parllel or only one receiver consumes in multiple threads and assign to executors in high level receiver based approach ? - They will consume the data parallel. For the receiver based approach, you can actually specify the number of receiver that you want to spawn for consuming the messages. On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: spark.streaming.concurrentJobs takes an integer value, not boolean. If you set it as 2 then 2 jobs will run parallel. Default value is 1 and the next job will start once it completes the current one. Actually, in the current implementation of Spark Streaming and under default configuration, only job is active (i.e. under execution) at any point of time. So if one batch's processing takes longer than 10 seconds, then then next batch's jobs will stay queued. This can be changed with an experimental Spark property spark.streaming.concurrentJobs which is by default set to 1. Its not currently documented (maybe I should add it). The reason it is set to 1 is that concurrent jobs can potentially lead to weird sharing of resources and which can make it hard to debug the whether there is sufficient resources in the system to process the ingested data fast enough. With only 1 job running at a time, it is easy to see that if batch processing time batch interval, then the system will be stable. Granted that this may not be the most efficient use of resources under certain conditions. We definitely hope to improve this in the future. Copied from TD's answer written in SO http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming . Non-receiver based streaming for example you can say are the fileStream, directStream ones. You can read a bit of information from here https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html Thanks Best Regards On Tue, May 19, 2015 at 2:13 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks Akhil. When I don't set spark.streaming.concurrentJobs to true. Will the all pending jobs starts one by one after 1 jobs completes,or it does not creates jobs which could not be started at its desired interval. And Whats the difference and usage of Receiver vs non-receiver based streaming. Is there any documentation for that? On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com wrote: It will be a single job running at a time by default (you can also configure the spark.streaming.concurrentJobs to run jobs parallel which is not recommended to put in production). Now, your batch duration being 1 sec and processing time being 2 minutes, if you are using a receiver based streaming then ideally those receivers will keep on receiving data while the job is running (which will accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in block not found exceptions as spark drops some blocks which are yet to process to accumulate new blocks). If you are using a non-receiver based approach, you will not have this problem of dropping blocks. Ideally, if your data is small and you have enough memory to hold your data then it will run smoothly without any issues. Thanks Best Regards On Tue, May 19, 2015 at 1:23 PM, Shushant Arora shushantaror...@gmail.com wrote: What happnes if in a streaming application one job is not yet finished and stream interval reaches. Does it starts next job or wait for first to finish and rest jobs will keep on accumulating in queue. Say I have a streaming application with stream interval of 1 sec, but my job takes 2 min to process 1 sec stream , what will happen ? At any time there will be only one job running or
Spark Streaming graceful shutdown in Spark 1.4
Hi, Just figured out that if I want to perform graceful shutdown of Spark Streaming 1.4 ( from master ) , the Runtime.getRuntime().addShutdownHook no longer works . As in Spark 1.4 there is Utils.addShutdownHook defined for Spark Core, that gets anyway called , which leads to graceful shutdown from Spark streaming failed with error like Sparkcontext already closed issue. To solve this , I need to explicitly add Utils.addShutdownHook in my driver with higher priority ( say 150 ) than Spark's shutdown priority of 50 , and there I specified streamingcontext stop method with (false , true) parameter. Just curious to know , if this is how we need to handle shutdown hook going forward ? Can't we make the streaming shutdown default to gracefully shutdown ? Also the Java Api for adding shutdownhook in Utils looks very dirty with methods like this .. Utils.addShutdownHook(150, new Function0BoxedUnit() { @Override public BoxedUnit apply() { return null; } @Override public byte apply$mcB$sp() { return 0; } @Override public char apply$mcC$sp() { return 0; } @Override public double apply$mcD$sp() { return 0; } @Override public float apply$mcF$sp() { return 0; } @Override public int apply$mcI$sp() { // TODO Auto-generated method stub return 0; } @Override public long apply$mcJ$sp() { return 0; } @Override public short apply$mcS$sp() { return 0; } @Override public void apply$mcV$sp() { *jsc.stop(false, true);* } @Override public boolean apply$mcZ$sp() { // TODO Auto-generated method stub return false; } });
Re: Reading Real Time Data only from Kafka
Thanks Cody for your email. I think my concern was not to get the ordering of message within a partition , which as you said is possible if one knows how Spark works. The issue is how Spark schedule jobs on every batch which is not on the same order they generated. So if that is not guaranteed it does not matter if you manege order within your partition. So depends on par-partition ordering to commit offset may leads to offsets commit in wrong order. In this thread you have discussed this as well and some workaround : https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15 So again , one need to understand every details of a Consumer to take a decision if that solves their use case. Regards, Dibyendu On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org wrote: As far as I can tell, Dibyendu's cons boil down to: 1. Spark checkpoints can't be recovered if you upgrade code 2. Some Spark transformations involve a shuffle, which can repartition data It's not accurate to imply that either one of those things are inherently cons of the direct stream api. Regarding checkpoints, nothing about the direct stream requires you to use checkpoints. You can save offsets in a checkpoint, your own database, or not save offsets at all (as James wants). One might even say that the direct stream api is . . . flexible . . . in that regard. Regarding partitions, the direct stream api gives you the same ordering guarantee as Kafka, namely that within a given partition messages will be in increasing offset order. Clearly if you do a transformation that repartitions the stream, that no longer holds. Thing is, that doesn't matter if you're saving offsets and results for each rdd in the driver. The offset ranges for the original rdd don't change as a result of the transformation you executed, they're immutable. Sure, you can get into trouble if you're trying to save offsets / results per partition on the executors, after a shuffle of some kind. You can avoid this pretty easily by just using normal scala code to do your transformation on the iterator inside a foreachPartition. Again, this isn't a con of the direct stream api, this is just a need to understand how Spark works. On Tue, May 12, 2015 at 10:30 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: The low level consumer which Akhil mentioned , has been running in Pearson for last 4-5 months without any downtime. I think this one is the reliable Receiver Based Kafka consumer as of today for Spark .. if you say it that way .. Prior to Spark 1.3 other Receiver based consumers have used Kafka High level APIs which has serious issue with re-balancing and lesser fault tolerant aspect and data loss . Cody's implementation is definitely a good approach using direct stream , but both direct stream based approach and receiver based low level consumer approach has pros and cons. Like Receiver based approach need to use WAL for recovery from Driver failure which is a overhead for Kafka like system . For direct stream the offsets stored as check-pointed directory got lost if driver code is modified ..you can manage offset from your driver but for derived stream generated from this direct stream , there is no guarantee that batches are processed is order ( and offsets commits in order ) .. etc .. So whoever use whichever consumer need to study pros and cons of both approach before taking a call .. Regards, Dibyendu On Tue, May 12, 2015 at 8:10 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Cody, I was just saying that i found more success and high throughput with the low level kafka api prior to KafkfaRDDs which is the future it seems. My apologies if you felt it that way. :) On 12 May 2015 19:47, Cody Koeninger c...@koeninger.org wrote: Akhil, I hope I'm misreading the tone of this. If you have personal issues at stake, please take them up outside of the public list. If you have actual factual concerns about the kafka integration, please share them in a jira. Regarding reliability, here's a screenshot of a current production job with a 3 week uptime Was a month before that, only took it down to change code. http://tinypic.com/r/2e4vkht/8 Regarding flexibility, both of the apis available in spark will do what James needs, as I described. On Tue, May 12, 2015 at 8:55 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Cody, If you are so sure, can you share a bench-marking (which you ran for days maybe?) that you have done with Kafka APIs provided by Spark? Thanks Best Regards On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org wrote: I don't think it's accurate for Akhil to claim that the linked library is much more flexible/reliable than what's available in Spark at this point. James, what you're describing is the default behavior for the createDirectStream api available as part of spark
Re: force the kafka consumer process to different machines
or you can use this Receiver as well : http://spark-packages.org/package/dibbhatt/kafka-spark-consumer Where you can specify how many Receivers you need for your topic and it will divides the partitions among the Receiver and return the joined stream for you . Say you specified 20 receivers , in that case each Receiver can handle 4 partitions and you get consumer parallelism of 20 receivers . Dibyendu On Wed, May 13, 2015 at 9:28 PM, 李森栋 lisend...@163.com wrote: thank you very much 来自 魅族 MX4 Pro 原始邮件 发件人:Cody Koeninger c...@koeninger.org 时间:周三 5月13日 23:52 收件人:hotdog lisend...@163.com 抄送:user@spark.apache.org 主题:Re: force the kafka consumer process to different machines I assume you're using the receiver based approach? Have you tried the createDirectStream api? https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html If you're sticking with the receiver based approach I think your only option would be to create more consumer streams and union them. That doesn't give you control over where they're run, but should increase the consumer parallelism. On Wed, May 13, 2015 at 10:33 AM, hotdog lisend...@163.com wrote: I 'm using streaming integrated with streaming-kafka. My kafka topic has 80 partitions, while my machines have 40 cores. I found that when the job is running, the kafka consumer processes are only deploy to 2 machines, the bandwidth of the 2 machines will be very very high. I wonder is there any way to control the kafka consumer's dispatch? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/force-the-kafka-consumer-process-to-different-machines-tp22872.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Reading Real Time Data only from Kafka
The low level consumer which Akhil mentioned , has been running in Pearson for last 4-5 months without any downtime. I think this one is the reliable Receiver Based Kafka consumer as of today for Spark .. if you say it that way .. Prior to Spark 1.3 other Receiver based consumers have used Kafka High level APIs which has serious issue with re-balancing and lesser fault tolerant aspect and data loss . Cody's implementation is definitely a good approach using direct stream , but both direct stream based approach and receiver based low level consumer approach has pros and cons. Like Receiver based approach need to use WAL for recovery from Driver failure which is a overhead for Kafka like system . For direct stream the offsets stored as check-pointed directory got lost if driver code is modified ..you can manage offset from your driver but for derived stream generated from this direct stream , there is no guarantee that batches are processed is order ( and offsets commits in order ) .. etc .. So whoever use whichever consumer need to study pros and cons of both approach before taking a call .. Regards, Dibyendu On Tue, May 12, 2015 at 8:10 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Cody, I was just saying that i found more success and high throughput with the low level kafka api prior to KafkfaRDDs which is the future it seems. My apologies if you felt it that way. :) On 12 May 2015 19:47, Cody Koeninger c...@koeninger.org wrote: Akhil, I hope I'm misreading the tone of this. If you have personal issues at stake, please take them up outside of the public list. If you have actual factual concerns about the kafka integration, please share them in a jira. Regarding reliability, here's a screenshot of a current production job with a 3 week uptime Was a month before that, only took it down to change code. http://tinypic.com/r/2e4vkht/8 Regarding flexibility, both of the apis available in spark will do what James needs, as I described. On Tue, May 12, 2015 at 8:55 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Cody, If you are so sure, can you share a bench-marking (which you ran for days maybe?) that you have done with Kafka APIs provided by Spark? Thanks Best Regards On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org wrote: I don't think it's accurate for Akhil to claim that the linked library is much more flexible/reliable than what's available in Spark at this point. James, what you're describing is the default behavior for the createDirectStream api available as part of spark since 1.3. The kafka parameter auto.offset.reset defaults to largest, ie start at the most recent available message. This is described at http://spark.apache.org/docs/latest/streaming-kafka-integration.html The createDirectStream api implementation is described in detail at https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md If for some reason you're stuck using an earlier version of spark, you can accomplish what you want simply by starting the job using a new consumer group (there will be no prior state in zookeeper, so it will start consuming according to auto.offset.reset) On Tue, May 12, 2015 at 7:26 AM, James King jakwebin...@gmail.com wrote: Very nice! will try and let you know, thanks. On Tue, May 12, 2015 at 2:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Yep, you can try this lowlevel Kafka receiver https://github.com/dibbhatt/kafka-spark-consumer. Its much more flexible/reliable than the one comes with Spark. Thanks Best Regards On Tue, May 12, 2015 at 5:15 PM, James King jakwebin...@gmail.com wrote: What I want is if the driver dies for some reason and it is restarted I want to read only messages that arrived into Kafka following the restart of the driver program and re-connection to Kafka. Has anyone done this? any links or resources that can help explain this? Regards jk
Re: Some questions on Multiple Streams
You can probably try the Low Level Consumer from spark-packages ( http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . How many partitions are there for your topics ? Let say you have 10 topics , and each having 3 partition , ideally you can create max 30 parallel Receiver and 30 streams. What I understand from your requirement is , for any given topic you want to choose the number of Receivers . e.g. for Topic A , you may choose 1 Receiver , for Topic B you choose 2 , for Topic C you choose 3 etc .. Now if you can distribute the topics to Receiver like this , you can very well use the above consumer which has this facility . Each Receiver task takes one executor core , so you can calculate accordingly. The implementation has a code example and read-me file , if you wish to try this , you can always email me . Regards, Dibyendu On Fri, Apr 17, 2015 at 3:06 PM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid wrote: Hi, I am working with multiple Kafka streams (23 streams) and currently I am processing them separately. I receive one stream from each topic. I have the following questions. 1.Spark streaming guide suggests to union these streams. *Is it possible to get statistics of each stream even after they are unioned?* 2.My calculations are not complex. I use 2 second batch interval and if I use 2 streams they get easily processed under 2 seconds by a single core. There is some shuffling involved in my application. As I increase the number of streams and the number of executors accordingly, the applications scheduling delay increases and become unmanageable in 2 seconds. As I believe this happens because with that many streams, the number of tasks increases thus the shuffling magnifies and also that all streams using the same executors. *Is it possible to provide part of executors to particular stream while processing streams simultaneously?* E.g. if I have 15 cores on cluster and 5 streams, 5 cores will be taken by 5 receivers and of the rest 10, can I provide 2 cores each to one of the 5 streams. Just to add, increasing the batch interval does help but I don't want to increase the batch size due to application restrictions and delayed results (The blockInterval and defaultParallelism does help to a limited extent). *Please see attach file for CODE SNIPPET* Regards, Laeeq - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Latest enhancement in Low Level Receiver based Kafka Consumer
Hi, Just to let you know, I have made some enhancement in Low Level Reliable Receiver based Kafka Consumer ( http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . Earlier version uses as many Receiver task for number of partitions of your kafka topic . Now you can configure desired number of Receivers task and every Receiver can handle subset of topic partitions. There was some use cases where consumer need to handle gigantic topics ( having 100+ partitions ) and using my receiver creates that many Receiver task and hence that many CPU cores is needed just for Receiver. It was a issue . In latest code, I have changed that behavior. The max limit for number of Receiver is still your number of partition, but if you specify less number of Receiver task, every receiver will handle a subset of partitions and consume using Kafka Low Level consumer API. Every receiver will manages partition(s) offset in ZK as usual way.. You can see the latest consumer here : http://spark-packages.org/package/dibbhatt/kafka-spark-consumer Regards, Dibyendu
Re: Question about Spark Streaming Receiver Failure
Which version of Spark you are running ? You can try this Low Level Consumer : http://spark-packages.org/package/dibbhatt/kafka-spark-consumer This is designed to recover from various failures and have very good fault recovery mechanism built in. This is being used by many users and at present we at Pearson running this Receiver in Production for almost 3 months without any issue. You can give this a try. Regards, Dibyendu On Mon, Mar 16, 2015 at 12:47 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You need to figure out why the receivers failed in the first place. Look in your worker logs and see what really happened. When you run a streaming job continuously for longer period mostly there'll be a lot of logs (you can enable log rotation etc.) and if you are doing a groupBy, join, etc type of operations, then there will be a lot of shuffle data. So You need to check in the worker logs and see what happened (whether DISK full etc.), We have streaming pipelines running for weeks without having any issues. Thanks Best Regards On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang yangjun...@gmail.com wrote: Guys, We have a project which builds upon Spark streaming. We use Kafka as the input stream, and create 5 receivers. When this application runs for around 90 hour, all the 5 receivers failed for some unknown reasons. In my understanding, it is not guaranteed that Spark streaming receiver will do fault recovery automatically. So I just want to figure out a way for doing fault-recovery to deal with receiver failure. There is a JIRA post mentioned using StreamingLister for monitoring the status of receiver: https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836 However I haven't found any open doc about how to do this stuff. Any guys have met the same issue and deal with it? Our environment: Spark 1.3.0 Dual Master Configuration Kafka 0.8.2 Thanks -- yangjun...@gmail.com http://hi.baidu.com/yjpro
Re: Question about Spark Streaming Receiver Failure
Yes.. Auto restart is enabled in my low level consumer ..when there is some unhandled exception comes... Even if you see KafkaConsumer.java, for some cases ( like broker failure, kafka leader changes etc ) it can even refresh the Consumer (The Coordinator which talks to a Leader) which will recover from those failures.. Dib On Mon, Mar 16, 2015 at 1:40 PM, Jun Yang yangjun...@gmail.com wrote: I have checked Dibyendu's code, it looks that his implementation has auto-restart mechanism: src/main/java/consumer/kafka/client/KafkaReceiver.java: private void start() { // Start the thread that receives data over a connection KafkaConfig kafkaConfig = new KafkaConfig(_props); ZkState zkState = new ZkState(kafkaConfig); _kConsumer = new KafkaConsumer(kafkaConfig, zkState, this); _kConsumer.open(_partitionId); Thread.UncaughtExceptionHandler eh = new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread th, Throwable ex) { restart(Restarting Receiver for Partition + _partitionId , ex, 5000); } }; _consumerThread = new Thread(_kConsumer); _consumerThread.setDaemon(true); _consumerThread.setUncaughtExceptionHandler(eh); _consumerThread.start(); } I also checked Spark's native Kafka Receiver implementation, and it looks not have any auto-restart support. Any comments from Dibyendu? On Mon, Mar 16, 2015 at 3:39 PM, Akhil Das ak...@sigmoidanalytics.com wrote: As i seen, once i kill my receiver on one machine, it will automatically spawn another receiver on another machine or on the same machine. Thanks Best Regards On Mon, Mar 16, 2015 at 1:08 PM, Jun Yang yangjun...@gmail.com wrote: Dibyendu, Thanks for the reply. I am reading your project homepage now. One quick question I care about is: If the receivers failed for some reasons(for example, killed brutally by someone else), is there any mechanism for the receiver to fail over automatically? On Mon, Mar 16, 2015 at 3:25 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Which version of Spark you are running ? You can try this Low Level Consumer : http://spark-packages.org/package/dibbhatt/kafka-spark-consumer This is designed to recover from various failures and have very good fault recovery mechanism built in. This is being used by many users and at present we at Pearson running this Receiver in Production for almost 3 months without any issue. You can give this a try. Regards, Dibyendu On Mon, Mar 16, 2015 at 12:47 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You need to figure out why the receivers failed in the first place. Look in your worker logs and see what really happened. When you run a streaming job continuously for longer period mostly there'll be a lot of logs (you can enable log rotation etc.) and if you are doing a groupBy, join, etc type of operations, then there will be a lot of shuffle data. So You need to check in the worker logs and see what happened (whether DISK full etc.), We have streaming pipelines running for weeks without having any issues. Thanks Best Regards On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang yangjun...@gmail.com wrote: Guys, We have a project which builds upon Spark streaming. We use Kafka as the input stream, and create 5 receivers. When this application runs for around 90 hour, all the 5 receivers failed for some unknown reasons. In my understanding, it is not guaranteed that Spark streaming receiver will do fault recovery automatically. So I just want to figure out a way for doing fault-recovery to deal with receiver failure. There is a JIRA post mentioned using StreamingLister for monitoring the status of receiver: https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836 However I haven't found any open doc about how to do this stuff. Any guys have met the same issue and deal with it? Our environment: Spark 1.3.0 Dual Master Configuration Kafka 0.8.2 Thanks -- yangjun...@gmail.com http://hi.baidu.com/yjpro -- yangjun...@gmail.com http://hi.baidu.com/yjpro -- yangjun...@gmail.com http://hi.baidu.com/yjpro
Re: Spark streaming app shutting down
Thanks Akhil for mentioning this Low Level Consumer ( https://github.com/dibbhatt/kafka-spark-consumer ) . Yes it has better fault tolerant mechanism than any existing Kafka consumer available . This has no data loss on receiver failure and have ability to reply or restart itself in-case of failure. You can definitely give it a try . Dibyendu On Thu, Feb 5, 2015 at 1:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote: AFAIK, From Spark 1.2.0 you can have WAL (Write Ahead Logs) for fault tolerance, which means it can handle the receiver/driver failures. You can also look at the lowlevel kafka consumer https://github.com/dibbhatt/kafka-spark-consumer which has a better fault tolerance mechanism for receiver failures. This low level consumer will push the offset of the message being read into zookeeper for fault tolerance. In your case i think mostly the inflight data would be lost if you arent using any of the fault tolerance mechanism. Thanks Best Regards On Wed, Feb 4, 2015 at 5:24 PM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Sprakans, I'm running a spark streaming app which reads data from kafka topic does some processing and then persists the results in HBase. I am using spark 1.2.0 running on Yarn cluster with 3 executors (2gb, 8 cores each). I've enable checkpointing I am also rate limiting my kafkaReceivers so that the number of items read is not more than 10 records per sec. The kafkaReceiver I'm using is *not* ReliableKafkaReceiver. This app was running fine for ~3 days then there was an increased load on the HBase server because of some other process querying HBase tables. This led to increase in the batch processing time of the spark batches (processed 1 min batch in 10 min) which previously was finishing in 20 sec which in turn led to the shutdown of the spark application, PFA the executor logs. From the logs I'm getting below exceptions *[1]* *[2]* looks like there was some outstanding Jobs that didn't get processed or the Job couldn't find the input data. From the logs it looks seems that the shutdown hook gets invoked but it cannot process the in-flight block. I have a couple of queries on this 1) Does this mean that these jobs failed and the *in-flight data *is lost? 2) Does the Spark job *buffers kafka* input data while the Job is under processing state for 10 mins and on shutdown is that too lost? (I do not see any OOM error in the logs). 3) Can we have *explicit commits* enabled in the kafkaReceiver so that the offsets gets committed only when the RDD(s) get successfully processed? Also I'd like to know if there is a *graceful way to shutdown a spark app running on yarn*. Currently I'm killing the yarn app to stop it which leads to loss of that job's history wheras in this case the application stops and succeeds and thus preserves the logs history. *[1]* 15/02/02 19:30:11 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from hbase28.usdc2.cloud.com/10.193.150.221:43189 is closed *[2]* java.lang.Exception: Could not compute split, block input-2-1422901498800 not found *[3]* org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/spark/realtime-failover/msg_2378481654720966.avro (inode 879488): File does not exist. Holder DFSClient_NONMAPREDUCE_-148264920_63 does not have any open files. -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com* - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Error when Spark streaming consumes from Kafka
Or you can use this Low Level Kafka Consumer for Spark : https://github.com/dibbhatt/kafka-spark-consumer This is now part of http://spark-packages.org/ and is running successfully for past few months in Pearson production environment . Being Low Level consumer, it does not have this re-balancing issue which High Level consumer have. Also I know there are few who has shifted to this Low Level Consumer which started giving them a better robust fault tolerant Kafka Receiver for Spark. Regards, Dibyendu On Tue, Feb 3, 2015 at 3:57 AM, Tathagata Das tathagata.das1...@gmail.com wrote: This is an issue that is hard to resolve without rearchitecting the whole Kafka Receiver. There are some workarounds worth looking into. http://mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/%3CCAFbh0Q38qQ0aAg_cj=jzk-kbi8xwf+1m6xlj+fzf6eetj9z...@mail.gmail.com%3E On Mon, Feb 2, 2015 at 1:07 PM, Greg Temchenko s...@dicefield.com wrote: Hi, This seems not fixed yet. I filed an issue in jira: https://issues.apache.org/jira/browse/SPARK-5505 Greg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-Spark-streaming-consumes-from-Kafka-tp19570p21471.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Error when Spark streaming consumes from Kafka
Thanks Neelesh . Glad to know this Low Level Consumer is working for you. Dibyendu On Tue, Feb 3, 2015 at 8:06 AM, Neelesh neele...@gmail.com wrote: We're planning to use this as well (Dibyendu's https://github.com/dibbhatt/kafka-spark-consumer ). Dibyendu, thanks for the efforts. So far its working nicely. I think there is merit in make it the default Kafka Receiver for spark streaming. -neelesh On Mon, Feb 2, 2015 at 5:25 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Or you can use this Low Level Kafka Consumer for Spark : https://github.com/dibbhatt/kafka-spark-consumer This is now part of http://spark-packages.org/ and is running successfully for past few months in Pearson production environment . Being Low Level consumer, it does not have this re-balancing issue which High Level consumer have. Also I know there are few who has shifted to this Low Level Consumer which started giving them a better robust fault tolerant Kafka Receiver for Spark. Regards, Dibyendu On Tue, Feb 3, 2015 at 3:57 AM, Tathagata Das tathagata.das1...@gmail.com wrote: This is an issue that is hard to resolve without rearchitecting the whole Kafka Receiver. There are some workarounds worth looking into. http://mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/%3CCAFbh0Q38qQ0aAg_cj=jzk-kbi8xwf+1m6xlj+fzf6eetj9z...@mail.gmail.com%3E On Mon, Feb 2, 2015 at 1:07 PM, Greg Temchenko s...@dicefield.com wrote: Hi, This seems not fixed yet. I filed an issue in jira: https://issues.apache.org/jira/browse/SPARK-5505 Greg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-Spark-streaming-consumes-from-Kafka-tp19570p21471.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming with Kafka
You can probably try the Low Level Consumer option with Spark 1.2 https://github.com/dibbhatt/kafka-spark-consumer This Consumer can recover from any underlying failure of Spark Platform or Kafka and either retry or restart the receiver. This is being working nicely for us. Regards, Dibyendu On Wed, Jan 21, 2015 at 7:46 AM, firemonk9 dhiraj.peech...@gmail.com wrote: Hi, I am having similar issues. Have you found any resolution ? Thank you -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Kafka-tp21222p21276.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
My code handles the Kafka Consumer part. But writing to Kafka may not be a big challenge which you can easily do in your driver code. dibyendu On Sat, Jan 17, 2015 at 9:43 AM, Debasish Das debasish.da...@gmail.com wrote: Hi Dib, For our usecase I want my spark job1 to read from hdfs/cache and write to kafka queues. Similarly spark job2 should read from kafka queues and write to kafka queues. Is writing to kafka queues from spark job supported in your code ? Thanks Deb On Jan 15, 2015 11:21 PM, Akhil Das ak...@sigmoidanalytics.com wrote: There was a simple example https://github.com/dibbhatt/kafka-spark-consumer/blob/master/examples/scala/LowLevelKafkaConsumer.scala#L45 which you can run after changing few lines of configurations. Thanks Best Regards On Fri, Jan 16, 2015 at 12:23 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Kidong, Just now I tested the Low Level Consumer with Spark 1.2 and I did not see any issue with Receiver.Store method . It is able to fetch messages form Kafka. Can you cross check other configurations in your setup like Kafka broker IP , topic name, zk host details, consumer id etc. Dib On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Kidong, No , I have not tried yet with Spark 1.2 yet. I will try this out and let you know how this goes. By the way, is there any change in Receiver Store method happened in Spark 1.2 ? Regards, Dibyendu On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote: Hi Dibyendu, I am using kafka 0.8.1.1 and spark 1.2.0. After modifying these version of your pom, I have rebuilt your codes. But I have not got any messages from ssc.receiverStream(new KafkaReceiver(_props, i)). I have found, in your codes, all the messages are retrieved correctly, but _receiver.store(_dataBuffer.iterator()) which is spark streaming abstract class's method does not seem to work correctly. Have you tried running your spark streaming kafka consumer with kafka 0.8.1.1 and spark 1.2.0 ? - Kidong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Kidong, No , I have not tried yet with Spark 1.2 yet. I will try this out and let you know how this goes. By the way, is there any change in Receiver Store method happened in Spark 1.2 ? Regards, Dibyendu On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote: Hi Dibyendu, I am using kafka 0.8.1.1 and spark 1.2.0. After modifying these version of your pom, I have rebuilt your codes. But I have not got any messages from ssc.receiverStream(new KafkaReceiver(_props, i)). I have found, in your codes, all the messages are retrieved correctly, but _receiver.store(_dataBuffer.iterator()) which is spark streaming abstract class's method does not seem to work correctly. Have you tried running your spark streaming kafka consumer with kafka 0.8.1.1 and spark 1.2.0 ? - Kidong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Kidong, Just now I tested the Low Level Consumer with Spark 1.2 and I did not see any issue with Receiver.Store method . It is able to fetch messages form Kafka. Can you cross check other configurations in your setup like Kafka broker IP , topic name, zk host details, consumer id etc. Dib On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Kidong, No , I have not tried yet with Spark 1.2 yet. I will try this out and let you know how this goes. By the way, is there any change in Receiver Store method happened in Spark 1.2 ? Regards, Dibyendu On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote: Hi Dibyendu, I am using kafka 0.8.1.1 and spark 1.2.0. After modifying these version of your pom, I have rebuilt your codes. But I have not got any messages from ssc.receiverStream(new KafkaReceiver(_props, i)). I have found, in your codes, all the messages are retrieved correctly, but _receiver.store(_dataBuffer.iterator()) which is spark streaming abstract class's method does not seem to work correctly. Have you tried running your spark streaming kafka consumer with kafka 0.8.1.1 and spark 1.2.0 ? - Kidong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi, Yes, as Jerry mentioned, the Spark -3129 ( https://issues.apache.org/jira/browse/SPARK-3129) enabled the WAL feature which solves the Driver failure problem. The way 3129 is designed , it solved the driver failure problem agnostic of the source of the stream ( like Kafka or Flume etc) But with just 3129 you can not achieve complete solution for data loss. You need a reliable receiver which should also solves the data loss issue on receiver failure. The Low Level Consumer (https://github.com/dibbhatt/kafka-spark-consumer) for which this email thread was started has solved that problem with Kafka Low Level API. And Spark-4062 as Jerry mentioned also recently solved the same problem using Kafka High Level API. On the Kafka High Level Consumer API approach , I would like to mention that Kafka 0.8 has some issue as mentioned in this wiki ( https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design) where consumer re-balance sometime fails and that is one of the key reason Kafka is re-writing consumer API in Kafka 0.9. I know there are few folks already have faced this re-balancing issues while using Kafka High Level API , and If you ask my opinion, we at Pearson are still using the Low Level Consumer as this seems to be more robust and performant and we have been using this for few months without any issue ..and also I may be little biased :) Regards, Dibyendu On Wed, Dec 3, 2014 at 7:04 AM, Shao, Saisai saisai.s...@intel.com wrote: Hi Rod, The purpose of introducing WAL mechanism in Spark Streaming as a general solution is to make all the receivers be benefit from this mechanism. Though as you said, external sources like Kafka have their own checkpoint mechanism, instead of storing data in WAL, we can only store metadata to WAL, and recover from the last committed offsets. But this requires sophisticated design of Kafka receiver with low-level API involved, also we need to take care of rebalance and fault tolerance things by ourselves. So right now instead of implementing a whole new receiver, we choose to implement a simple one, though the performance is not so good, it's much easier to understand and maintain. The design purpose and implementation of reliable Kafka receiver can be found in (https://issues.apache.org/jira/browse/SPARK-4062). And in future, to improve the reliable Kafka receiver like what you mentioned is on our scheduler. Thanks Jerry -Original Message- From: RodrigoB [mailto:rodrigo.boav...@aspect.com] Sent: Wednesday, December 3, 2014 5:44 AM To: u...@spark.incubator.apache.org Subject: Re: Low Level Kafka Consumer for Spark Dibyendu, Just to make sure I will not be misunderstood - My concerns are referring to the Spark upcoming solution and not yours. I would to gather the perspective of someone which implemented recovery with Kafka a different way. Tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20196.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Error when Spark streaming consumes from Kafka
I believe this is something to do with how Kafka High Level API manages consumers within a Consumer group and how it re-balance during failure. You can find some mention in this Kafka wiki. https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design Due to various issues in Kafka High Level APIs, Kafka is moving the High Level Consumer API to a complete new set of API in Kafka 0.9. Other than this co-ordination issue, High Level consumer also has data loss issues. You can probably try this Spark-Kafka consumer which uses Low Level Simple consumer API which is more performant and have no data loss scenarios. https://github.com/dibbhatt/kafka-spark-consumer Regards, Dibyendu On Sun, Nov 23, 2014 at 2:13 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am using Spark to consume from Kafka. However, after the job has run for several hours, I saw the following failure of an executor: kafka.common.ConsumerRebalanceFailedException: group-1416624735998_ip-172-31-5-242.ec2.internal-1416648124230-547d2c31 can't rebalance after 4 retries kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432) kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722) kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212) kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:138) org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114) org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Does anyone know the reason for this exception? Thanks! Bill
Re: Low Level Kafka Consumer for Spark
Hi Alon, No this will not be guarantee that same set of messages will come in same RDD. This fix just re-play the messages from last processed offset in same order. Again this is just a interim fix we needed to solve our use case . If you do not need this message re-play feature, just do not perform the ack ( Acknowledgement) call in the Driver code. Then the processed messages will not be written to ZK and hence replay will not happen. Regards, Dibyendu On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er alo...@supersonicads.com wrote: Hi Dibyendu, Thanks for your great work! I'm new to Spark Streaming, so I just want to make sure I understand Driver failure issue correctly. In my use case, I want to make sure that messages coming in from Kafka are always broken into the same set of RDDs, meaning that if a set of messages are assigned to one RDD, and the Driver dies before this RDD is processed, then once the Driver recovers, the same set of messages are assigned to a single RDD, instead of arbitrarily repartitioning the messages across different RDDs. Does your Receiver guarantee this behavior, until the problem is fixed in Spark 1.2? Regards, Alon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Tim, I have not tried persist the RDD. Here are some discussion on Rate Limiting Spark Streaming is there in this thread. http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-rate-limiting-from-kafka-td8590.html There is a Pull Request https://github.com/apache/spark/pull/945/files to fix this Rate Limiting issue at BlockGenerator level. But while testing with heavy load, this fix did not solve my problem. So I had to have Rate Limiting built into Kafka Consumer. I will make it configurable soon. If this is not done, I can see Block are getting dropped which leads to Job failure. I have raised this in another thread .. https://mail.google.com/mail/u/1/?tab=wm#search/Serious/148650fd829cd239. But have not got any answer yet if this is a bug ( Block getting dropped and Job failed). Dib On Mon, Sep 15, 2014 at 10:33 PM, Tim Smith secs...@gmail.com wrote: Hi Dibyendu, I am a little confused about the need for rate limiting input from kafka. If the stream coming in from kafka has higher message/second rate than what a Spark job can process then it should simply build a backlog in Spark if the RDDs are cached on disk using persist(). Right? Thanks, Tim On Mon, Sep 15, 2014 at 4:33 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Alon, No this will not be guarantee that same set of messages will come in same RDD. This fix just re-play the messages from last processed offset in same order. Again this is just a interim fix we needed to solve our use case . If you do not need this message re-play feature, just do not perform the ack ( Acknowledgement) call in the Driver code. Then the processed messages will not be written to ZK and hence replay will not happen. Regards, Dibyendu On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er alo...@supersonicads.com wrote: Hi Dibyendu, Thanks for your great work! I'm new to Spark Streaming, so I just want to make sure I understand Driver failure issue correctly. In my use case, I want to make sure that messages coming in from Kafka are always broken into the same set of RDDs, meaning that if a set of messages are assigned to one RDD, and the Driver dies before this RDD is processed, then once the Driver recovers, the same set of messages are assigned to a single RDD, instead of arbitrarily repartitioning the messages across different RDDs. Does your Receiver guarantee this behavior, until the problem is fixed in Spark 1.2? Regards, Alon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Some Serious Issue with Spark Streaming ? Blocks Getting Removed and Jobs have Failed..
) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61) at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) -- Nan Zhu On Thursday, September 11, 2014 at 10:42 AM, Nan Zhu wrote: Hi, Can you attach more logs to see if there is some entry from ContextCleaner? I met very similar issue before…but haven’t get resolved Best, -- Nan Zhu On Thursday, September 11, 2014 at 10:13 AM, Dibyendu Bhattacharya wrote: Dear All, Not sure if this is a false alarm. But wanted to raise to this to understand what is happening. I am testing the Kafka Receiver which I have written ( https://github.com/dibbhatt/kafka-spark-consumer) which basically a low level Kafka Consumer implemented custom Receivers for every Kafka topic partitions and pulling data in parallel. Individual streams from all topic partitions are then merged to create Union stream which used for further processing. The custom Receiver working fine in normal load
Re: Some Serious Issue with Spark Streaming ? Blocks Getting Removed and Jobs have Failed..
I agree, Even the Low Level Kafka Consumer which I have written has tunable IO throttling which help me solve this issue ... But question remains , even if there are large backlog, why Spark drop the unprocessed memory blocks ? Dib On Fri, Sep 12, 2014 at 5:47 PM, Jeoffrey Lim jeoffr...@gmail.com wrote: Our issue could be related to this problem as described in: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-in-1-hour-batch-duration-RDD-files-gets-lost-td14027.html which the DStream is processed for every 1 hour batch duration. I have implemented IO throttling in the Receiver as well in our Kafka consumer, and our backlog is not that large. NFO : org.apache.spark.storage.MemoryStore - 1 blocks selected for dropping INFO : org.apache.spark.storage.BlockManager - Dropping block *input-0-1410443074600* from memory INFO : org.apache.spark.storage.MemoryStore - Block input-0-1410443074600 of size 12651900 dropped from memory (free 21220667) INFO : org.apache.spark.storage.BlockManagerInfo - Removed input-0-1410443074600 on ip-10-252-5-113.asskickery.us:53752 in memory (size: 12.1 MB, free: 100.6 MB) The question that I have now is: how to prevent the MemoryStore/BlockManager of dropping the block inputs? And should they be logged in the level WARN/ERROR? Thanks. On Fri, Sep 12, 2014 at 4:45 PM, Dibyendu Bhattacharya [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=14081i=0 wrote: Dear all, I am sorry. This was a false alarm There was some issue in the RDD processing logic which leads to large backlog. Once I fixed the issues in my processing logic, I can see all messages being pulled nicely without any Block Removed error. I need to tune certain configurations in my Kafka Consumer to modify the data rate and also the batch size. Sorry again. Regards, Dibyendu On Thu, Sep 11, 2014 at 8:13 PM, Nan Zhu [hidden email] http://user/SendEmail.jtp?type=nodenode=14075i=0 wrote: This is my case about broadcast variable: 14/07/21 19:49:13 INFO Executor: Running task ID 4 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost (progress: 3/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO Executor: Finished task ID 3 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 5 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3)*14/07/21 19:49:13 INFO ContextCleaner: Cleaned broadcast 0* 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost (progress: 4/106) 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0*14/07/21 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 dropped from memory (free 886623436)* 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for shuffle 0 14/07/21 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5 14/07/21 http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5 14/07/21 http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 4 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 4 directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 4 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 6 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 4 in 80 ms on localhost (progress: 5/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 5 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 5 directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 5 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:7 as TID 7
Re: How to scale more consumer to Kafka stream
I agree Gerard. Thanks for pointing this.. Dib On Thu, Sep 11, 2014 at 5:28 PM, Gerard Maas gerard.m...@gmail.com wrote: This pattern works. One note, thought: Use 'union' only if you need to group the data from all RDDs into one RDD for processing (like count distinct or need a groupby). If your process can be parallelized over every stream of incoming data, I suggest you just apply the required transformations on every dstream and avoid 'union' altogether. -kr, Gerard. On Wed, Sep 10, 2014 at 8:17 PM, Tim Smith secs...@gmail.com wrote: How are you creating your kafka streams in Spark? If you have 10 partitions for a topic, you can call createStream ten times to create 10 parallel receivers/executors and then use union to combine all the dStreams. On Wed, Sep 10, 2014 at 7:16 AM, richiesgr richie...@gmail.com wrote: Hi (my previous post as been used by someone else) I'm building a application the read from kafka stream event. In production we've 5 consumers that share 10 partitions. But on spark streaming kafka only 1 worker act as a consumer then distribute the tasks to workers so I can have only 1 machine acting as consumer but I need more because only 1 consumer means Lags. Do you've any idea what I can do ? Another point is interresting the master is not loaded at all I can get up more than 10 % CPU I've tried to increase the queued.max.message.chunks on the kafka client to read more records thinking it'll speed up the read but I only get ERROR consumer.ConsumerFetcherThread: [ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73; ClientId: SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7] - PartitionFetchInfo(929838589,1048576),[IA2,6] - PartitionFetchInfo(929515796,1048576),[IA2,9] - PartitionFetchInfo(929577946,1048576),[IA2,8] - PartitionFetchInfo(930751599,1048576),[IA2,2] - PartitionFetchInfo(926457704,1048576),[IA2,5] - PartitionFetchInfo(930774385,1048576),[IA2,0] - PartitionFetchInfo(929913213,1048576),[IA2,3] - PartitionFetchInfo(929268891,1048576),[IA2,4] - PartitionFetchInfo(929949877,1048576),[IA2,1] - PartitionFetchInfo(930063114,1048576) java.lang.OutOfMemoryError: Java heap space Is someone have ideas ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi , The latest changes with Kafka message re-play by manipulating ZK offset seems to be working fine for us. This gives us some relief till actual issue is fixed in Spark 1.2 . I have some question on how Spark process the Received data. The logic I used is basically to pull messages form individual partitions using dedicated Receivers, and doing a Union of these Stream . After that I process this union stream. Today I wanted to test this consumer with our Internal Kafka cluster which has around 50 million records, with this huge backlog I found Spark only running the Receiver task and not running the Processing task (or rather doing it very slow) . Is this a issue with the Consumer or it is a issue from Spark side ? Ideally when Receivers durably write data to Store , the processing should start in parallel . Why does the processing task need to wait till the Receiver consumes all 50 million messages. ...Or may be I am doing something wrong ? I can share the driver log if you want. in Driver I can see only storage.BlockManagerInfo: Added input... type messages, but hardly I see scheduler.TaskSetManager: Starting task... messages.. I see data getting written to target system in very very slow pace. Regards, Dibyendu On Mon, Sep 8, 2014 at 12:08 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Tathagata, I have managed to implement the logic into the Kafka-Spark consumer to recover from Driver failure. This is just a interim fix till actual fix is done from Spark side. The logic is something like this. 1. When the Individual Receivers starts for every Topic partition, it writes the Kafka messages along with certain meta data in Block Store. This meta data contains the details of message offset, partition id, topic name and consumer id. You can see this logic in PartitionManager.java next() method. 2. In the Driver code ( Consumer.java) , I am creating the union of all there individual D-Streams, and processing the data using forEachRDD call. In the driver code, I am receiving the RDD which contains the Kafka messages along with meta data details. In the driver code, periodically I am committing the processed offset of the Kafka message into ZK. 3. When driver stops, and restart again, the Receiver starts again, and this time in PartitionManager.java, I am checking what is the actual committed offset for the partition, and what is the actual processed offset of the same partition. This logic is in the PartitionManager constructor. If this is a Receiver restart, and processed offset of less than Committed offset, I am started fetching again from Processed offset. This may lead to duplicate records, but our system can handle duplicates. I have tested with multiple driver kill/stops and I found no data loss in Kafka consumer. In the Driver code, I have not done any checkpointing yet, will test that tomorrow. One interesting thing I found, if I do repartition of original stream , I can still see the issue of data loss in this logic. What I believe, during re- partitioning Spark might be changing the order of RDDs the way it generated from Kafka stream. So during re-partition case, even when I am committing processed offset, but as this is not in order I still see issue. Not sure if this understanding is correct, but not able to find any other explanation. But if I do not use repartition this solution works fine. I can make this as configurable, so that when actual fix is available , this feature in consumer can be turned off as this is an overhead for the consumer . Let me know what you think.. Regards, Dibyendu On Fri, Sep 5, 2014 at 11:14 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Some thoughts on this thread to clarify the doubts. 1. Driver recovery: The current (1.1 to be released) does not recover the raw data that has been received but not processes. This is because when the driver dies, the executors die and so does the raw data that was stored in it. Only for HDFS, the data is not lost by driver recovery as the data is already present reliably in HDFS. This is something we want to fix by Spark 1.2 (3 month from now). Regarding recovery by replaying the data from Kafka, it is possible but tricky. Our goal is to provide strong guarantee, exactly-once semantics in all transformations. To guarantee this for all kinds of streaming computations stateful and not-stateful computations, it is requires that the data be replayed through Kafka in exactly same order, and the underlying blocks of data in Spark be regenerated in the exact way as it would have if there was no driver failure. This is quite tricky to implement, requires manipulation of zookeeper offsets, etc, that is hard to do with the high level consumer that KafkaUtil uses. Dibyendu's low level Kafka receiver may enable such approaches in the future. For now we definitely plan to solve the first problem very very soon. 3
Re: How to scale more consumer to Kafka stream
Hi, You can use this Kafka Spark Consumer. https://github.com/dibbhatt/kafka-spark-consumer This is exactly does that . It creates parallel Receivers for every Kafka topic partitions. You can see the Consumer.java under consumer.kafka.client package to see an example how to use it. There is some discussion on this Consumer you can find it here : https://mail.google.com/mail/u/1/?tab=wm#search/kafka+spark+consumer/14797b2cbbaa8689 Regards, Dib On Wed, Sep 10, 2014 at 11:47 PM, Tim Smith secs...@gmail.com wrote: How are you creating your kafka streams in Spark? If you have 10 partitions for a topic, you can call createStream ten times to create 10 parallel receivers/executors and then use union to combine all the dStreams. On Wed, Sep 10, 2014 at 7:16 AM, richiesgr richie...@gmail.com wrote: Hi (my previous post as been used by someone else) I'm building a application the read from kafka stream event. In production we've 5 consumers that share 10 partitions. But on spark streaming kafka only 1 worker act as a consumer then distribute the tasks to workers so I can have only 1 machine acting as consumer but I need more because only 1 consumer means Lags. Do you've any idea what I can do ? Another point is interresting the master is not loaded at all I can get up more than 10 % CPU I've tried to increase the queued.max.message.chunks on the kafka client to read more records thinking it'll speed up the read but I only get ERROR consumer.ConsumerFetcherThread: [ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73; ClientId: SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7] - PartitionFetchInfo(929838589,1048576),[IA2,6] - PartitionFetchInfo(929515796,1048576),[IA2,9] - PartitionFetchInfo(929577946,1048576),[IA2,8] - PartitionFetchInfo(930751599,1048576),[IA2,2] - PartitionFetchInfo(926457704,1048576),[IA2,5] - PartitionFetchInfo(930774385,1048576),[IA2,0] - PartitionFetchInfo(929913213,1048576),[IA2,3] - PartitionFetchInfo(929268891,1048576),[IA2,4] - PartitionFetchInfo(929949877,1048576),[IA2,1] - PartitionFetchInfo(930063114,1048576) java.lang.OutOfMemoryError: Java heap space Is someone have ideas ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Tathagata, I have managed to implement the logic into the Kafka-Spark consumer to recover from Driver failure. This is just a interim fix till actual fix is done from Spark side. The logic is something like this. 1. When the Individual Receivers starts for every Topic partition, it writes the Kafka messages along with certain meta data in Block Store. This meta data contains the details of message offset, partition id, topic name and consumer id. You can see this logic in PartitionManager.java next() method. 2. In the Driver code ( Consumer.java) , I am creating the union of all there individual D-Streams, and processing the data using forEachRDD call. In the driver code, I am receiving the RDD which contains the Kafka messages along with meta data details. In the driver code, periodically I am committing the processed offset of the Kafka message into ZK. 3. When driver stops, and restart again, the Receiver starts again, and this time in PartitionManager.java, I am checking what is the actual committed offset for the partition, and what is the actual processed offset of the same partition. This logic is in the PartitionManager constructor. If this is a Receiver restart, and processed offset of less than Committed offset, I am started fetching again from Processed offset. This may lead to duplicate records, but our system can handle duplicates. I have tested with multiple driver kill/stops and I found no data loss in Kafka consumer. In the Driver code, I have not done any checkpointing yet, will test that tomorrow. One interesting thing I found, if I do repartition of original stream , I can still see the issue of data loss in this logic. What I believe, during re- partitioning Spark might be changing the order of RDDs the way it generated from Kafka stream. So during re-partition case, even when I am committing processed offset, but as this is not in order I still see issue. Not sure if this understanding is correct, but not able to find any other explanation. But if I do not use repartition this solution works fine. I can make this as configurable, so that when actual fix is available , this feature in consumer can be turned off as this is an overhead for the consumer . Let me know what you think.. Regards, Dibyendu On Fri, Sep 5, 2014 at 11:14 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Some thoughts on this thread to clarify the doubts. 1. Driver recovery: The current (1.1 to be released) does not recover the raw data that has been received but not processes. This is because when the driver dies, the executors die and so does the raw data that was stored in it. Only for HDFS, the data is not lost by driver recovery as the data is already present reliably in HDFS. This is something we want to fix by Spark 1.2 (3 month from now). Regarding recovery by replaying the data from Kafka, it is possible but tricky. Our goal is to provide strong guarantee, exactly-once semantics in all transformations. To guarantee this for all kinds of streaming computations stateful and not-stateful computations, it is requires that the data be replayed through Kafka in exactly same order, and the underlying blocks of data in Spark be regenerated in the exact way as it would have if there was no driver failure. This is quite tricky to implement, requires manipulation of zookeeper offsets, etc, that is hard to do with the high level consumer that KafkaUtil uses. Dibyendu's low level Kafka receiver may enable such approaches in the future. For now we definitely plan to solve the first problem very very soon. 3. Repartitioning: I am trying to understand the repartition issue. One common mistake I have seen is that developers repartition a stream but not use the repartitioned stream. WRONG: inputDstream.repartition(100) inputDstream.map(...).count().print() RIGHT: val repartitionedDStream = inputDStream.repartitoin(100) repartitionedDStream.map(...).count().print() Not sure if this helps solve the problem that you all the facing. I am going to add this to the stremaing programming guide to make sure this common mistake is avoided. TD On Wed, Sep 3, 2014 at 10:38 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi, Sorry for little delay . As discussed in this thread, I have modified the Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer) code to have dedicated Receiver for every Topic Partition. You can see the example howto create Union of these receivers in consumer.kafka.client.Consumer.java . Thanks to Chris for suggesting this change. Regards, Dibyendu On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB rodrigo.boav...@aspect.com wrote: Just a comment on the recovery part. Is it correct to say that currently Spark Streaming recovery design does not consider re-computations (upon metadata lineage recovery) that depend on blocks of data of the received stream? https://issues.apache.org/jira/browse/SPARK
Re: Low Level Kafka Consumer for Spark
Hi, Sorry for little delay . As discussed in this thread, I have modified the Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer) code to have dedicated Receiver for every Topic Partition. You can see the example howto create Union of these receivers in consumer.kafka.client.Consumer.java . Thanks to Chris for suggesting this change. Regards, Dibyendu On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB rodrigo.boav...@aspect.com wrote: Just a comment on the recovery part. Is it correct to say that currently Spark Streaming recovery design does not consider re-computations (upon metadata lineage recovery) that depend on blocks of data of the received stream? https://issues.apache.org/jira/browse/SPARK-1647 Just to illustrate a real use case (mine): - We have object states which have a Duration field per state which is incremented on every batch interval. Also this object state is reset to 0 upon incoming state changing events. Let's supposed there is at least one event since the last data checkpoint. This will lead to inconsistency upon driver recovery: The Duration field will get incremented from the data checkpoint version until the recovery moment, but the state change event will never be re-processed...so in the end we have the old state with the wrong Duration value. To make things worst, let's imagine we're dumping the Duration increases somewhere...which means we're spreading the problem across our system. Re-computation awareness is something I've commented on another thread and rather treat it separately. http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205 Re-computations do occur, but the only RDD's that are recovered are the ones from the data checkpoint. This is what we've seen. Is not enough by itself to ensure recovery of computed data and this partial recovery leads to inconsistency in some cases. Roger - I share the same question with you - I'm just not sure if the replicated data really gets persisted on every batch. The execution lineage is checkpointed, but if we have big chunks of data being consumed to Receiver node on let's say a second bases then having it persisted to HDFS every second could be a big challenge for keeping JVM performance - maybe that could be reason why it's not really implemented...assuming it isn't. Dibyendu had a great effort with the offset controlling code but the general state consistent recovery feels to me like another big issue to address. I plan on having a dive into the Streaming code and try to at least contribute with some ideas. Some more insight from anyone on the dev team will be very appreciated. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13208.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
I agree. This issue should be fixed in Spark rather rely on replay of Kafka messages. Dib On Aug 28, 2014 6:45 AM, RodrigoB rodrigo.boav...@aspect.com wrote: Dibyendu, Tnks for getting back. I believe you are absolutely right. We were under the assumption that the raw data was being computed again and that's not happening after further tests. This applies to Kafka as well. The issue is of major priority fortunately. Regarding your suggestion, I would maybe prefer to have the problem resolved within Spark's internals since once the data is replicated we should be able to access it once more and not having to pool it back again from Kafka or any other stream that is being affected by this issue. If for example there is a big amount of batches to be recomputed I would rather have them done distributed than overloading the batch interval with huge amount of Kafka messages. I do not have yet enough know how on where is the issue and about the internal Spark code so I can't really how much difficult will be the implementation. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12966.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi, As I understand, your problem is similar to this JIRA. https://issues.apache.org/jira/browse/SPARK-1647 The issue in this case, Kafka can not replay the message as offsets are already committed. Also I think existing KafkaUtils ( The Default High Level Kafka Consumer) also have this issue. Similar discussion is there in this thread also... http://apache-spark-user-list.1001560.n3.nabble.com/Data-loss-Spark-streaming-and-network-receiver-td12337.html As I am thinking, it is possible to tackle this in the consumer code I have written. If we can store the topic partition_id and consumed offset in ZK after every checkpoint , then after Spark recover from the fail over, the present PartitionManager code can start reading from last checkpointed offset ( instead last committed offset as it is doing now) ..In that case it can replay the data since last checkpoint. I will think over it .. Regards, Dibyendu On Mon, Aug 25, 2014 at 11:23 PM, RodrigoB rodrigo.boav...@aspect.com wrote: Hi Dibyendu, My colleague has taken a look at the spark kafka consumer github you have provided and started experimenting. We found that somehow when Spark has a failure after a data checkpoint, the expected re-computations correspondent to the metadata checkpoints are not recovered so we loose Kafka messages and RDD's computations in Spark. The impression is that this code is replacing quite a bit of Spark Kafka Streaming code where maybe (not sure) metadata checkpoints are done every batch interval. Was it on purpose to solely depend on the Kafka commit to recover data and recomputations between data checkpoints? If so, how to make this work? tnks Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12757.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Bharat, Thanks for your email. If the Kafka Reader worker process dies, it will be replaced by different machine, and it will start consuming from the offset where it left over ( for each partition). Same case can happen even if I tried to have individual Receiver for every partition. Regards, Dibyendu On Tue, Aug 26, 2014 at 5:43 AM, bharatvenkat bvenkat.sp...@gmail.com wrote: I like this consumer for what it promises - better control over offset and recovery from failures. If I understand this right, it still uses single worker process to read from Kafka (one thread per partition) - is there a way to specify multiple worker processes (on different machines) to read from Kafka? Maybe one worker process for each partition? If there is no such option, what happens when the single machine hosting the Kafka Reader worker process dies and is replaced by a different machine (like in cloud)? Thanks, Bharat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12788.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Data loss - Spark streaming and network receiver
Dear All, Recently I have written a Spark Kafka Consumer to solve this problem. Even we have seen issues with KafkaUtils which is using Highlevel Kafka Consumer and consumer code has no handle to offset management. The below code solves this problem, and this has is being tested in our Spark Cluster and this working fine as of now. https://github.com/dibbhatt/kafka-spark-consumer This is Low Level Kafka Consumer using Kafka Simple Consumer API. Please have a look at it and let me know your opinion. This has been written to eliminate the Data loss by committing the offset after it is written to BM. Also existing HighLevel KafkaUtils does not have any feature to control Data Flow, and is gives Out Of Memory error is there is too much backlogs in Kafka. This consumer solves this problem as well. And this code has been modified from earlier Storm Kafka consumer code and it has lot of other features like recovery from Kafka node failures, ZK failures, recover from Offset errors etc. Regards, Dibyendu On Tue, Aug 19, 2014 at 9:49 AM, Shao, Saisai saisai.s...@intel.com wrote: I think Currently Spark Streaming lack a data acknowledging mechanism when data is stored and replicated in BlockManager, so potentially data will be lost even pulled into Kafka, say if data is stored just in BlockGenerator not BM, while in the meantime Kafka itself commit the consumer offset, also at this point node is failed, from Kafka’s point this part of data is feed into Spark Streaming but actually this data is not yet processed, so potentially this part of data will never be processed again, unless you read the whole partition again. To solve this potential data loss problem, Spark Streaming needs to offer a data acknowledging mechanism, so custom Receiver can use this acknowledgement to do checkpoint or recovery, like Storm. Besides, driver failure is another story need to be carefully considered. So currently it is hard to make sure no data loss in Spark Streaming, still need to improve at some points J. Thanks Jerry *From:* Tobias Pfeiffer [mailto:t...@preferred.jp] *Sent:* Tuesday, August 19, 2014 10:47 AM *To:* Wei Liu *Cc:* user *Subject:* Re: Data loss - Spark streaming and network receiver Hi Wei, On Tue, Aug 19, 2014 at 10:18 AM, Wei Liu wei@stellarloyalty.com wrote: Since our application cannot tolerate losing customer data, I am wondering what is the best way for us to address this issue. 1) We are thinking writing application specific logic to address the data loss. To us, the problem seems to be caused by that Kinesis receivers advanced their checkpoint before we know for sure the data is replicated. For example, we can do another checkpoint ourselves to remember the kinesis sequence number for data that has been processed by spark streaming. When Kinesis receiver is restarted due to worker failures, we restarted it from the checkpoint we tracked. This sounds pretty much to me like the way Kafka does it. So, I am not saying that the stock KafkaReceiver does what you want (it may or may not), but it should be possible to update the offset (corresponds to sequence number) in Zookeeper only after data has been replicated successfully. I guess replace Kinesis by Kafka is not in option for you, but you may consider pulling Kinesis data into Kafka before processing with Spark? Tobias
Re: Spark stream data from kafka topics and output as parquet file on HDFS
You can try this Kafka Spark Consumer which I recently wrote. This uses the Low Level Kafka Consumer https://github.com/dibbhatt/kafka-spark-consumer Dibyendu On Tue, Aug 5, 2014 at 12:52 PM, rafeeq s rafeeq.ec...@gmail.com wrote: Hi, I am new to Apache Spark and Trying to Develop spark streaming program to *stream data from kafka topics and output as parquet file on HDFS*. Please share the *sample reference* program to stream data from kafka topics and output as parquet file on HDFS. Thanks in Advance. Regards, Rafeeq S *(“What you do is what matters, not what you think or say or plan.” )*
Re: Low Level Kafka Consumer for Spark
Thanks Jonathan, Yes, till non-ZK based offset management is available in Kafka, I need to maintain the offset in ZK. And yes, both cases explicit commit is necessary. I modified the Low Level Kafka Spark Consumer little bit to have Receiver spawns threads for every partition of the topic and perform the 'store' operation in multiple threads. It would be good if the receiver.store methods are made thread safe..which is not now presently . Waiting for TD's comment on this Kafka Spark Low Level consumer. Regards, Dibyendu On Tue, Aug 5, 2014 at 5:32 AM, Jonathan Hodges hodg...@gmail.com wrote: Hi Yan, That is a good suggestion. I believe non-Zookeeper offset management will be a feature in the upcoming Kafka 0.8.2 release tentatively scheduled for September. https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management That should make this fairly easy to implement, but it will still require explicit offset commits to avoid data loss which is different than the current KafkaUtils implementation. Jonathan On Mon, Aug 4, 2014 at 4:51 PM, Yan Fang yanfang...@gmail.com wrote: Another suggestion that may help is that, you can consider use Kafka to store the latest offset instead of Zookeeper. There are at least two benefits: 1) lower the workload of ZK 2) support replay from certain offset. This is how Samza http://samza.incubator.apache.org/ deals with the Kafka offset, the doc is here http://samza.incubator.apache.org/learn/documentation/0.7.0/container/checkpointing.html . Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Sun, Aug 3, 2014 at 8:59 PM, Patrick Wendell pwend...@gmail.com wrote: I'll let TD chime on on this one, but I'm guessing this would be a welcome addition. It's great to see community effort on adding new streams/receivers, adding a Java API for receivers was something we did specifically to allow this :) - Patrick On Sat, Aug 2, 2014 at 10:09 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi, I have implemented a Low Level Kafka Consumer for Spark Streaming using Kafka Simple Consumer API. This API will give better control over the Kafka offset management and recovery from failures. As the present Spark KafkaUtils uses HighLevel Kafka Consumer API, I wanted to have a better control over the offset management which is not possible in Kafka HighLevel consumer. This Project is available in below Repo : https://github.com/dibbhatt/kafka-spark-consumer I have implemented a Custom Receiver consumer.kafka.client.KafkaReceiver. The KafkaReceiver uses low level Kafka Consumer API (implemented in consumer.kafka packages) to fetch messages from Kafka and 'store' it in Spark. The logic will detect number of partitions for a topic and spawn that many threads (Individual instances of Consumers). Kafka Consumer uses Zookeeper for storing the latest offset for individual partitions, which will help to recover in case of failure. The Kafka Consumer logic is tolerant to ZK Failures, Kafka Leader of Partition changes, Kafka broker failures, recovery from offset errors and other fail-over aspects. The consumer.kafka.client.Consumer is the sample Consumer which uses this Kafka Receivers to generate DStreams from Kafka and apply a Output operation for every messages of the RDD. We are planning to use this Kafka Spark Consumer to perform Near Real Time Indexing of Kafka Messages to target Search Cluster and also Near Real Time Aggregation using target NoSQL storage. Kindly let me know your view. Also if this looks good, can I contribute to Spark Streaming project. Regards, Dibyendu
Low Level Kafka Consumer for Spark
Hi, I have implemented a Low Level Kafka Consumer for Spark Streaming using Kafka Simple Consumer API. This API will give better control over the Kafka offset management and recovery from failures. As the present Spark KafkaUtils uses HighLevel Kafka Consumer API, I wanted to have a better control over the offset management which is not possible in Kafka HighLevel consumer. This Project is available in below Repo : https://github.com/dibbhatt/kafka-spark-consumer I have implemented a Custom Receiver consumer.kafka.client.KafkaReceiver. The KafkaReceiver uses low level Kafka Consumer API (implemented in consumer.kafka packages) to fetch messages from Kafka and 'store' it in Spark. The logic will detect number of partitions for a topic and spawn that many threads (Individual instances of Consumers). Kafka Consumer uses Zookeeper for storing the latest offset for individual partitions, which will help to recover in case of failure. The Kafka Consumer logic is tolerant to ZK Failures, Kafka Leader of Partition changes, Kafka broker failures, recovery from offset errors and other fail-over aspects. The consumer.kafka.client.Consumer is the sample Consumer which uses this Kafka Receivers to generate DStreams from Kafka and apply a Output operation for every messages of the RDD. We are planning to use this Kafka Spark Consumer to perform Near Real Time Indexing of Kafka Messages to target Search Cluster and also Near Real Time Aggregation using target NoSQL storage. Kindly let me know your view. Also if this looks good, can I contribute to Spark Streaming project. Regards, Dibyendu