Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-25 Thread Cody Koeninger
Sounds like something's not set up right... can you post a minimal code example that reproduces the issue? On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang suchenz...@gmail.com wrote: Yeah. All messages are lost while the streaming job was down. On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger c

Re: Spark Direct Streaming With ZK Updates

2015-08-24 Thread Cody Koeninger
It doesn't matter if shuffling occurs. Just update ZK from the driver, inside the foreachRDD, after all your dynamodb updates are done. Since you're just doing it for monitoring purposes, that should be fine. On Mon, Aug 24, 2015 at 12:11 PM, suchenzang suchenz...@gmail.com wrote: Forgot to

Re: Spark Direct Streaming With ZK Updates

2015-08-24 Thread Cody Koeninger
I'd start off by trying to simplify that closure - you don't need the transform step, or currOffsetRanges to be scoped outside of it. Just do everything in foreachRDD. LIkewise, it looks like zkClient is also scoped outside of the closure passed to foreachRDD i.e. you have zkClient = new

Re: How to parse multiple event types using Kafka

2015-08-23 Thread Cody Koeninger
Each spark partition will contain messages only from a single kafka topcipartition. Use hasOffsetRanges to tell which kafka topicpartition it's from. See the docs http://spark.apache.org/docs/latest/streaming-kafka-integration.html On Sun, Aug 23, 2015 at 10:56 AM, Spark Enthusiast

Re: spark streaming 1.3 kafka error

2015-08-22 Thread Cody Koeninger
happen that you are spending too much time on disk io etc. On Aug 21, 2015 7:32 AM, Cody Koeninger c...@koeninger.org wrote: Sounds like that's happening consistently, not an occasional network problem? Look at the Kafka broker logs Make sure you've configured the correct kafka broker

Re: spark streaming 1.3 kafka error

2015-08-21 Thread Cody Koeninger
Sounds like that's happening consistently, not an occasional network problem? Look at the Kafka broker logs Make sure you've configured the correct kafka broker hosts / ports (note that direct stream does not use zookeeper host / port). Make sure that host / port is reachable from your driver

Re: Kafka Spark Partition Mapping

2015-08-20 Thread Cody Koeninger
In general you cannot guarantee which node an RDD will be processed on. The preferred location for a kafkardd is the kafka leader for that partition, if they're deployed on the same machine. If you want to try to override that behavior, the method is getPreferredLocations But even in that case,

Re: spark kafka partitioning

2015-08-20 Thread Cody Koeninger
I'm not clear on your question, can you rephrase it? Also, are you talking about createStream or createDirectStream? On Thu, Aug 20, 2015 at 9:48 PM, Gaurav Agarwal gaurav130...@gmail.com wrote: Hello Regarding Spark Streaming and Kafka Partitioning When i send message on kafka topic with

Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-19 Thread Cody Koeninger
, Shushant Arora shushantaror...@gmail.com wrote: But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java generic inheritance is not supported so derived class cannot return different genric typed subclass from overriden method. On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger c

Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-18 Thread Cody Koeninger
the same error did not come while extending DirectKafkaInputDStream from InputDStream ? Since new return type Option[KafkaRDD[K, V, U, T, R]] is not subclass of Option[RDD[T] so it should have been failed? On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger c...@koeninger.org wrote

Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-18 Thread Cody Koeninger
[][] but its expecting scala.Optionorg.apache.spark.rdd.RDDbyte[][] from derived class . Is there something wring with code? On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org wrote: Look at the definitions of the java-specific KafkaUtils.createDirectStream methods (the ones

Re: Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation

2015-08-18 Thread Cody Koeninger
looking forward to it :) On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger c...@koeninger.org wrote: The solution you found is also in the docs: http://spark.apache.org/docs/latest/streaming-kafka-integration.html Java uses an atomic reference because Java doesn't allow you to close over non

Re: Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation

2015-08-18 Thread Cody Koeninger
The solution you found is also in the docs: http://spark.apache.org/docs/latest/streaming-kafka-integration.html Java uses an atomic reference because Java doesn't allow you to close over non-final references. I'm not clear on your other question. On Tue, Aug 18, 2015 at 3:43 AM, Petr Novak

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Cody Koeninger
I wouldn't expect a kafka producer to be serializable at all... among other things, it has a background thread On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan wansheng...@gmail.com wrote: Hi, Did anyone see java.util.ConcurrentModificationException when using broadcast variables? I

Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-17 Thread Cody Koeninger
instead of Function1 ? On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger c...@koeninger.org wrote: I'm not aware of an existing api per se, but you could create your own subclass of the DStream that returns None for compute() under certain conditions. On Wed, Aug 12, 2015 at 1:03 PM

Re: Spark Streaming: Change Kafka topics on runtime

2015-08-14 Thread Cody Koeninger
stopped it cannot be restarted, but I also haven't found a way to stop the app programmatically. The batch duration will probably be around 1-10 seconds. I think this is small enough to not make it a batch job? Thanks again On Thu, Aug 13, 2015 at 10:15 PM, Cody Koeninger c...@koeninger.org

Re: Maintaining Kafka Direct API Offsets

2015-08-14 Thread Cody Koeninger
Use your email client to send a message to the mailing list from the email address you used to subscribe? The message you just sent reached the list On Fri, Aug 14, 2015 at 9:36 AM, dutrow dan.dut...@gmail.com wrote: How do I get beyond the This post has NOT been accepted by the mailing list

Re: Maintaining Kafka Direct API Offsets

2015-08-14 Thread Cody Koeninger
I don't entirely agree with that assessment. Not paying for extra cores to run receivers was about as important as delivery semantics, as far as motivations for the api. As I said in the jira tickets on the topic, if you want to use the direct api and save offsets to ZK, you can. The right way

Re: Checkpointing doesn't appear to be working for direct streaming from Kafka

2015-08-14 Thread Cody Koeninger
You'll resume and re-process the rdd that didnt finish On Fri, Aug 14, 2015 at 1:31 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Our additional question on checkpointing is basically the logistics of it -- At which point does the data get written into checkpointing? Is it written

Re: Spark Streaming: Change Kafka topics on runtime

2015-08-13 Thread Cody Koeninger
The current kafka stream implementation assumes the set of topics doesn't change during operation. You could either take a crack at writing a subclass that does what you need; stop/start; or if your batch duration isn't too small, you could run it as a series of RDDs (using the existing

Re: spark.streaming.maxRatePerPartition parameter: what are the benefits?

2015-08-13 Thread Cody Koeninger
It's just to limit the maximum number of records a given executor needs to deal with in a given batch. Typical usage would be if you're starting a stream from the beginning of a kafka log, or after a long downtime, and don't want ALL of the messages in the first batch. On Thu, Aug 13, 2015 at

Re: Retrieving offsets from previous spark streaming checkpoint

2015-08-13 Thread Cody Koeninger
Access the offsets using HasOffsetRanges, save them in your datastore, provide them as the fromOffsets argument when starting the stream. See https://github.com/koeninger/kafka-exactly-once On Thu, Aug 13, 2015 at 3:53 PM, Stephen Durfey sjdur...@gmail.com wrote: When deploying a spark

Re: avoid duplicate due to executor failure in spark stream

2015-08-12 Thread Cody Koeninger
will just ignore already processed events by accessing counter of failed task. Is it directly possible to access accumulator per task basis without writing to hdfs or hbase. On Tue, Aug 11, 2015 at 3:15 AM, Cody Koeninger c...@koeninger.org wrote: http://spark.apache.org/docs/latest

Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Cody Koeninger
That looks like it's during recovery from a checkpoint, so it'd be driver memory not executor memory. How big is the checkpoint directory that you're trying to restore from? On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: We're getting the below error.

Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Cody Koeninger
the original checkpointing directory :( Thanks for the clarification on spark.driver.memory, I'll keep testing (at 2g things seem OK for now). On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger c...@koeninger.org wrote: That looks like it's during recovery from a checkpoint, so it'd be driver

Re: Kafka direct approach: blockInterval and topic partitions

2015-08-10 Thread Cody Koeninger
There's no long-running receiver pushing blocks of messages, so blockInterval isn't relevant. Batch interval is what matters. On Mon, Aug 10, 2015 at 12:52 PM, allonsy luke1...@gmail.com wrote: Hi everyone, I recently started using the new Kafka direct approach. Now, as far as I

Re: avoid duplicate due to executor failure in spark stream

2015-08-10 Thread Cody Koeninger
http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-of-output-operations https://www.youtube.com/watch?v=fXnNEq1v3VA On Mon, Aug 10, 2015 at 4:32 PM, Shushant

Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Cody Koeninger
tolerance. Why does Spark persist whole RDD's of data? Shouldn't it be sufficient to just persist the offsets, to know where to resume from? Thanks. On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger c...@koeninger.org wrote: You need to keep a certain number of rdds around for checkpointing

Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Cody Koeninger
of these costs? On Mon, Aug 10, 2015 at 4:27 PM, Cody Koeninger c...@koeninger.org wrote: The rdd is indeed defined by mostly just the offsets / topic partitions. On Mon, Aug 10, 2015 at 3:24 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: You need to keep a certain number of rdds

Re: spark-kafka directAPI vs receivers based API

2015-08-10 Thread Cody Koeninger
For direct stream questions: https://github.com/koeninger/kafka-exactly-once Yes, it is used in production. For general spark streaming question: http://spark.apache.org/docs/latest/streaming-programming-guide.html On Mon, Aug 10, 2015 at 7:51 AM, Mohit Durgapal durgapalmo...@gmail.com

Re: spark streaming max receiver rate doubts

2015-08-04 Thread Cody Koeninger
Those jobs will still be created for each valid time, they just may not have many messages in them On Mon, Aug 3, 2015 at 11:11 PM, Shushant Arora shushantaror...@gmail.com wrote: 1.In spark 1.3(Non receiver) - If my batch interval is 1 sec and I don't set

Re: No Twitter Input from Kafka to Spark Streaming

2015-08-04 Thread Cody Koeninger
Have you tried using the console consumer to see if anything is actually getting published to that topic? On Tue, Aug 4, 2015 at 11:45 AM, narendra narencs...@gmail.com wrote: My application takes Twitter4j tweets and publishes those to a topic in Kafka. Spark Streaming subscribes to that

Re: spark streaming program failed on Spark 1.4.1

2015-08-03 Thread Cody Koeninger
Just to be clear, did you rebuild your job against spark 1.4.1 as well as upgrading the cluster? On Mon, Aug 3, 2015 at 8:36 AM, Netwaver wanglong_...@163.com wrote: Hi All, I have a spark streaming + kafka program written by Scala, it works well on Spark 1.3.1, but after I migrate

Re: Checkpointing doesn't appear to be working for direct streaming from Kafka

2015-07-31 Thread Cody Koeninger
Show us the relevant code On Fri, Jul 31, 2015 at 12:16 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: I've instrumented checkpointing per the programming guide and I can tell that Spark Streaming is creating the checkpoint directories but I'm not seeing any content being created in

Re: Problems with JobScheduler

2015-07-30 Thread Cody Koeninger
Just so I'm clear, the difference in timing you're talking about is this: 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at MetricsSpark.scala:67, took 60.391761 s 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at MetricsSpark.scala:67, took 0.531323 s Are

Re: Upgrade of Spark-Streaming application

2015-07-30 Thread Cody Koeninger
You can't use checkpoints across code upgrades. That may or may not change in the future, but for now that's a limitation of spark checkpoints (regardless of whether you're using Kafka). Some options: - Start up the new job on a different cluster, then kill the old job once it's caught up to

Re: Problems with JobScheduler

2015-07-30 Thread Cody Koeninger
. On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I have three topics with one partition each topic. So each jobs run about one topics. 2015-07-30 16:20 GMT+02:00 Cody Koeninger c...@koeninger.org: Just so I'm clear, the difference in timing you're talking about

Re: Spark Streaming Kafka could not find leader offset for Set()

2015-07-30 Thread Cody Koeninger
is valid and it has data I can see data in it using another kafka consumer. On Jul 30, 2015 7:31 AM, Cody Koeninger c...@koeninger.org wrote: The last time someone brought this up on the mailing list, the issue actually was that the topic(s) didn't exist in Kafka at the time the spark job

Re: Spark Streaming Kafka could not find leader offset for Set()

2015-07-29 Thread Cody Koeninger
The last time someone brought this up on the mailing list, the issue actually was that the topic(s) didn't exist in Kafka at the time the spark job was running. On Wed, Jul 29, 2015 at 6:17 PM, Tathagata Das t...@databricks.com wrote: There is a known issue that Kafka cannot return leader

Re: Has anybody ever tried running Spark Streaming on 500 text streams?

2015-07-29 Thread Cody Koeninger
@Ashwin you don't need to append the topic to your data if you're using the direct stream. You can get the topic from the offset range, see http://spark.apache.org/docs/latest/streaming-kafka-integration.html (search for offsetRange) If you're using the receiver based stream, you'll need to

Re: spark streaming get kafka individual message's offset and partition no

2015-07-28 Thread Cody Koeninger
You don't have to use some other package in order to get access to the offsets. Shushant, have you read the available documentation at http://spark.apache.org/docs/latest/streaming-kafka-integration.html https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md or watched

Re: Checkpoints in SparkStreaming

2015-07-28 Thread Cody Koeninger
Yes, you need to follow the documentation. Configure your stream, including the transformations made to it, inside the getOrCreate function. On Tue, Jul 28, 2015 at 3:14 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I'm using SparkStreaming and I want to configure checkpoint to manage

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-28 Thread Cody Koeninger
24, 2015 at 3:32 PM, Cody Koeninger c...@koeninger.org wrote: It's really a question of whether you need access to the MessageAndMetadata, or just the key / value from the message. If you just need the key/value, dstream map is fine. In your case, since you need to be able to control

Re: Best practice for transforming and storing from Spark to Mongo/HDFS

2015-07-25 Thread Cody Koeninger
Use foreachPartition and batch the writes On Sat, Jul 25, 2015 at 9:14 AM, nib...@free.fr wrote: Hello, I am new user of Spark, and need to know what could be the best practice to do the following scenario : - Spark Streaming receives XML messages from Kafka - Spark transforms each message

Re: New consumer - offset one gets in poll is not offset one is supposed to commit

2015-07-24 Thread Cody Koeninger
Well... there are only 2 hard problems in computer science: naming things, cache invalidation, and off-by-one errors. The direct stream implementation isn't asking you to commit anything. It's asking you to provide a starting point for the stream on startup. Because offset ranges are inclusive

Re: New consumer - offset one gets in poll is not offset one is supposed to commit

2015-07-24 Thread Cody Koeninger
/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L805 After processing read ConsumerRecord, commit expects me to submit not offset 0 but offset 1... Kind regards, Stevo Slavic On Fri, Jul 24, 2015 at 6:31 PM, Cody Koeninger c...@koeninger.org wrote: Well

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-24 Thread Cody Koeninger
method (convert each message within this handler) or dstream.foreachRDD.map (map rdd) or dstream.map.foreachRDD (map dstream) ? Thank you for your help Cody. Regards, Nicolas PHUNG On Tue, Jul 21, 2015 at 4:53 PM, Cody Koeninger c...@koeninger.org wrote: Yeah, I'm referring to that api

Re: user threads in executors

2015-07-22 Thread Cody Koeninger
Yes, look at KafkaUtils.createRDD On Wed, Jul 22, 2015 at 11:17 AM, Shushant Arora shushantaror...@gmail.com wrote: Thanks ! I am using spark streaming 1.3 , And if some post fails because of any reason, I will store the offset of that message in another kafka topic. I want to read these

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-21 Thread Cody Koeninger
, Jul 20, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org wrote: Yeah, in the function you supply for the messageHandler parameter to createDirectStream, catch the exception and do whatever makes sense for your application. On Mon, Jul 20, 2015 at 11:58 AM, Nicolas Phung nicolas.ph

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-20 Thread Cody Koeninger
, Nicolas PHUNG On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org wrote: Not exactly the same issue, but possibly related: https://issues.apache.org/jira/browse/KAFKA-1196 On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger c...@koeninger.org wrote: Well, working backwards

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-20 Thread Cody Koeninger
or KafkaUtils.createDirectStream ? Regards, Nicolas PHUNG On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger c...@koeninger.org wrote: I'd try logging the offsets for each message, see where problems start, then try using the console consumer starting at those offsets and see if you can reproduce the problem

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-16 Thread Cody Koeninger
Well, working backwards down the stack trace... at java.nio.Buffer.limit(Buffer.java:275) That exception gets thrown if the limit is negative or greater than the buffer's capacity at kafka.message.Message.sliceDelimited(Message.scala:236) If size had been negative, it would have just returned

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-16 Thread Cody Koeninger
Not exactly the same issue, but possibly related: https://issues.apache.org/jira/browse/KAFKA-1196 On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger c...@koeninger.org wrote: Well, working backwards down the stack trace... at java.nio.Buffer.limit(Buffer.java:275) That exception gets thrown

Re: Sessionization using updateStateByKey

2015-07-15 Thread Cody Koeninger
, but I seem to remember they were around 10-30 minutes long. Even with peaks in volume, Spark managed to keep up very well. Thanks, Silvio From: Cody Koeninger Date: Wednesday, July 15, 2015 at 5:38 PM To: algermissen1971 Cc: Tathagata Das, swetha, user Subject: Re: Sessionization using

Re: Sessionization using updateStateByKey

2015-07-15 Thread Cody Koeninger
I personally would try to avoid updateStateByKey for sessionization when you have long sessions / a lot of keys, because it's linear on the number of keys. On Tue, Jul 14, 2015 at 6:25 PM, Tathagata Das t...@databricks.com wrote: [Apologies for repost, for those who have seen this response

Re: Sessionization using updateStateByKey

2015-07-15 Thread Cody Koeninger
the same thing with better scaling characteristics? Jan On 15 Jul 2015, at 15:33, Cody Koeninger c...@koeninger.org wrote: I personally would try to avoid updateStateByKey for sessionization when you have long sessions / a lot of keys, because it's linear on the number of keys. On Tue

Re: spark streaming with kafka reset offset

2015-07-14 Thread Cody Koeninger
? Essentially, all I need to do is monitor the progress of data consumption of the Kafka topic. On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger c...@koeninger.org wrote: You can't use different versions of spark in your application vs your cluster. For the direct stream, it's not 60 partitions per

Re: createDirectStream and Stats

2015-07-13 Thread Cody Koeninger
the framework does is really desirable. With repartition, there is shuffle involved, but at least the computation load spreads across all 100 executors instead of just 30. On Fri, Jun 19, 2015 at 7:14 PM, Cody Koeninger c...@koeninger.org wrote: If that's the case, you're still only using as many

Re: Duplicated UnusedStubClass in assembly

2015-07-13 Thread Cody Koeninger
Yeah, I had brought that up a while back, but didn't get agreement on removing the stub. Seems to be an intermittent problem. You can just add an exclude: mergeStrategy in assembly := { case PathList(org, apache, spark, unused, UnusedStubClass.class) = MergeStrategy.first case x =

Re: spark streaming doubt

2015-07-13 Thread Cody Koeninger
Regarding your first question, having more partitions than you do executors usually means you'll have better utilization, because the workload will be distributed more evenly. There's some degree of per-task overhead, but as long as you don't have a huge imbalance between number of tasks and

Re: Spark off heap memory leak on Yarn with Kafka direct stream

2015-07-13 Thread Cody Koeninger
Does the issue only happen when you have no traffic on the topic? Have you profiled to see what's using heap space? On Mon, Jul 13, 2015 at 1:05 PM, Apoorva Sareen apoorva.sar...@gmail.com wrote: Hi, I am running spark streaming 1.4.0 on Yarn (Apache distribution 2.6.0) with java 1.8.0_45

Re: spark streaming kafka compatibility

2015-07-09 Thread Cody Koeninger
consumer kafka consumer 0.8.0 with kafka cluster 0.8.2 and that works. On Thu, Jul 9, 2015 at 9:58 PM, Cody Koeninger c...@koeninger.org wrote: It's the consumer version. Should work with 0.8.2 clusters. On Thu, Jul 9, 2015 at 11:10 AM, Shushant Arora shushantaror...@gmail.com wrote: Does

Re: spark streaming kafka compatibility

2015-07-09 Thread Cody Koeninger
It's the consumer version. Should work with 0.8.2 clusters. On Thu, Jul 9, 2015 at 11:10 AM, Shushant Arora shushantaror...@gmail.com wrote: Does spark streaming 1.3 requires kafka version 0.8.1.1 and is not compatible with kafka 0.8.2 ? As per maven dependency of spark streaming 1.3 with

Re: writing to kafka using spark streaming

2015-07-06 Thread Cody Koeninger
Use foreachPartition, and allocate whatever the costly resource is once per partition. On Mon, Jul 6, 2015 at 6:11 AM, Shushant Arora shushantaror...@gmail.com wrote: I have a requirement to write in kafka queue from a spark streaming application. I am using spark 1.2 streaming. Since

Re: Restarting Spark Streaming Application with new code

2015-07-06 Thread Cody Koeninger
You shouldn't rely on being able to restart from a checkpoint after changing code, regardless of whether the change was explicitly related to serialization. If you are relying on checkpoints to hold state, specifically which offsets have been processed, that state will be lost if you can't

Re: spark streaming with kafka reset offset

2015-06-30 Thread Cody Koeninger
:46 PM, Cody Koeninger c...@koeninger.org wrote: Read the spark streaming guide ad the kafka integration guide for a better understanding of how the receiver based stream works. Capacity planning is specific to your environment and what the job is actually doing, youll need to determine

Re: spark streaming with kafka reset offset

2015-06-29 Thread Cody Koeninger
=d5UJonrruHklist=PL-x35fyliRwgfhffEpywn4q23ykotgQJ6index=4 ] from last Spark Summit 2015). But that approach can give duplicate records. The direct approach gives exactly-once guarantees, so you should try it out. TD On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger c...@koeninger.org wrote

Re: How to recover in case user errors in streaming

2015-06-26 Thread Cody Koeninger
the job created for the specific batch, but the subsequent batches still proceed, isn’t it right ? And question still remains, how to keep track of those failed batches ? From: amit assudani aassud...@impetus.com Date: Friday, June 26, 2015 at 11:21 AM To: Cody Koeninger c...@koeninger.org Cc

Re: How to recover in case user errors in streaming

2015-06-26 Thread Cody Koeninger
If you're consistently throwing exceptions and thus failing tasks, once you reach max failures the whole stream will stop. It's up to you to either catch those exceptions, or restart your stream appropriately once it stops. Keep in mind that if you're relying on checkpoints, and fixing the error

Re: How to recover in case user errors in streaming

2015-06-26 Thread Cody Koeninger
the connectivity issues to persistent store which gets resolved in a while, but how do I know which all messages failed and need rework ? Regards, Amit From: Cody Koeninger c...@koeninger.org Date: Friday, June 26, 2015 at 11:16 AM To: amit assudani aassud...@impetus.com Cc: user

Re: spark streaming with kafka reset offset

2015-06-26 Thread Cody Koeninger
The receiver-based kafka createStream in spark 1.2 uses zookeeper to store offsets. If you want finer-grained control over offsets, you can update the values in zookeeper yourself before starting the job. createDirectStream in spark 1.3 is still marked as experimental, and subject to change.

Re: spark streaming job fails to restart after checkpointing due to DStream initialization errors

2015-06-26 Thread Cody Koeninger
Make sure you're following the docs regarding setting up a streaming checkpoint. Post your code if you can't get it figured out. On Fri, Jun 26, 2015 at 3:45 PM, Ashish Nigam ashnigamt...@gmail.com wrote: I bring up spark streaming job that uses Kafka as input source. No data to process and

Re: spark streaming with kafka reset offset

2015-06-26 Thread Cody Koeninger
per second.Each event is of ~500bytes. Having 5 receivers with 60 partitions each receiver is sufficient for spark streaming to consume ? On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger c...@koeninger.org javascript:_e(%7B%7D,'cvml','c...@koeninger.org'); wrote: The receiver-based kafka

Re: Kafka createDirectStream ​issue

2015-06-23 Thread Cody Koeninger
The exception $line49 is referring to a line of the spark shell. Have you tried it from an actual assembled job with spark-submit ? On Tue, Jun 23, 2015 at 3:48 PM, syepes sye...@gmail.com wrote: Hello, I ​am trying ​use the new Kafka ​consumer ​​KafkaUtils.createDirectStream​ but I am

Re: createDirectStream and Stats

2015-06-19 Thread Cody Koeninger
createStream, the app does deteriorate but over a much longer period, hours vs days. On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com wrote: Yes, please tell us what operation are you using. TD On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org wrote

Re: createDirectStream and Stats

2015-06-19 Thread Cody Koeninger
. Are there enhancements to this specific API between 1.3 and 1.4 that can substantially change it's behaviour? On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org wrote: when you say your old version was k = createStream . were you manually creating multiple receivers? Because otherwise

Re: createDirectStream and Stats

2015-06-19 Thread Cody Koeninger
, Cody Koeninger c...@koeninger.org wrote: If that's the case, you're still only using as many read executors as there are kafka partitions. I'd remove the repartition. If you weren't doing any shuffles in the old job, and are doing a shuffle in the new job, it's not really comparable. On Fri

Re: RE: Spark or Storm

2015-06-19 Thread Cody Koeninger
...@163.com *From:* Haopu Wang hw...@qilinsoft.com *Date:* 2015-06-19 18:47 *To:* Enno Shioji eshi...@gmail.com; Tathagata Das t...@databricks.com *CC:* prajod.vettiyat...@wipro.com; Cody Koeninger c...@koeninger.org; bit1...@163.com; Jordan Pilat jrpi...@gmail.com; Will Briggs wrbri

Re: RE: Spark or Storm

2015-06-19 Thread Cody Koeninger
:* Haopu Wang hw...@qilinsoft.com *Date:* 2015-06-19 18:47 *To:* Enno Shioji eshi...@gmail.com; Tathagata Das t...@databricks.com *CC:* prajod.vettiyat...@wipro.com; Cody Koeninger c...@koeninger.org; bit1...@163.com; Jordan Pilat jrpi...@gmail.com; Will Briggs wrbri...@gmail.com; Ashish Soni

Re: createDirectStream and Stats

2015-06-19 Thread Cody Koeninger
Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote: Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive

Re: RE: Spark or Storm

2015-06-18 Thread Cody Koeninger
That general description is accurate, but not really a specific issue of the direct steam. It applies to anything consuming from kafka (or, as Matei already said, any streaming system really). You can't have exactly once semantics, unless you know something more about how you're storing results.

Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection

2015-06-12 Thread Cody Koeninger
there ... but as long as you're making sure the setup gets called at most once per executor, before the work that needs it ... should be ok. On Fri, Jun 12, 2015 at 4:11 PM, algermissen1971 algermissen1...@icloud.com wrote: On 12 Jun 2015, at 22:59, Cody Koeninger c...@koeninger.org wrote: Close

Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection

2015-06-12 Thread Cody Koeninger
Close. the mapPartitions call doesn't need to do anything at all to the iter. mapPartitions { iter = SomeDb.conn.init iter } On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 algermissen1...@icloud.com wrote: Cody, On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote

Re: How to use Window Operations with kafka Direct-API?

2015-06-12 Thread Cody Koeninger
of whether you're using the createStream or createDirectStream api. On Fri, Jun 12, 2015 at 9:14 AM, Cody Koeninger c...@koeninger.org wrote: Casting to HasOffsetRanges would be meaningless anyway if done after an operation that changes partitioning. You can still use the messageHandler argument

Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection

2015-06-12 Thread Cody Koeninger
There are several database apis that use a thread local or singleton reference to a connection pool (we use ScalikeJDBC currently, but there are others). You can use mapPartitions earlier in the chain to make sure the connection pool is set up on that executor, then use it inside updateStateByKey

Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?

2015-06-12 Thread Cody Koeninger
The scala api has 2 ways of calling createDirectStream. One of them allows you to pass a message handler that gets full access to the kafka MessageAndMetadata, including offset. I don't know why the python api was developed with only one way to call createDirectStream, but the first thing I'd

Re: Spark Streaming Stuck After 10mins Issue...

2015-06-07 Thread Cody Koeninger
What is the code used to set up the kafka stream? On Sat, Jun 6, 2015 at 3:23 PM, EH eas...@gmail.com wrote: And here is the Thread Dump, where seems every worker is waiting for Executor #6 Thread 95: sparkExecutor-akka.actor.default-dispatcher-22 (RUNNABLE) to be complete: Thread 41:

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-04 Thread Cody Koeninger
direct stream isn't a receiver, it isn't required to cache data anywhere unless you want it to. If you want it, just call cache. On Thu, Jun 4, 2015 at 8:20 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: set the storage policy for the DStream RDDs to MEMORY AND DISK - it appears the

Re: Behavior of the spark.streaming.kafka.maxRatePerPartition config param?

2015-06-03 Thread Cody Koeninger
The default of 0 means no limit. Each batch will grab as much as is available, ie a range of offsets spanning from the end of the previous batch to the highest available offsets on the leader. If you set spark.streaming.kafka.maxRatePerPartition 0, the number you set is the maximum number of

Re: updateStateByKey and kafka direct approach without shuffle

2015-06-02 Thread Cody Koeninger
[HasOffsetRanges].offsetRanges) new ProxyRDDWithPartitioner(rdd, part) } ... But how can I create same partitioner during updateStateByKey call? I have no idea how to access rdd when calling updateStateByKey. вт, 2 июня 2015 г. в 19:15, Cody Koeninger c...@koeninger.org: I think

Re: How to monitor Spark Streaming from Kafka?

2015-06-01 Thread Cody Koeninger
KafkaCluster.scala in the spark/extrernal/kafka project has a bunch of api code, including code for updating Kafka-managed ZK offsets. Look at setConsumerOffsets. Unfortunately all of that code is private, but you can either write your own, copy it, or do what I do (sed out private[spark] and

Re: spark-streaming-kafka_2.11 not available yet?

2015-05-26 Thread Cody Koeninger
It's being added in 1.4 https://repository.apache.org/content/repositories/orgapachespark-1104/org/apache/spark/spark-streaming-kafka_2.11/1.4.0-rc2/ On Tue, May 26, 2015 at 3:14 AM, Petr Novak oss.mli...@gmail.com wrote: Hello, I would like to switch from Scala 2.10 to 2.11 for Spark app

Re: Trying to connect to many topics with several DirectConnect

2015-05-22 Thread Cody Koeninger
I just verified that the following code works on 1.3.0 : val stream1 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic1) val stream2 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic2)

Re: Reading Real Time Data only from Kafka

2015-05-19 Thread Cody Koeninger
the message in case of failures? Thanks Best Regards On Wed, May 13, 2015 at 8:32 PM, Cody Koeninger c...@koeninger.org wrote: You linked to a google mail tab, not a public archive, so I don't know exactly which conversation you're referring to. As far as I know, streaming only runs a single

Re: Spark Streaming + Kafka failure recovery

2015-05-19 Thread Cody Koeninger
PM, Bill Jay bill.jaypeter...@gmail.com wrote: If a Spark streaming job stops at 12:01 and I resume the job at 12:02. Will it still start to consume the data that were produced to Kafka at 12:01? Or it will just start consuming from the current time? On Tue, May 19, 2015 at 10:58 AM, Cody

Re: Spark Streaming + Kafka failure recovery

2015-05-19 Thread Cody Koeninger
Have you read https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md ? 1. There's nothing preventing that. 2. Checkpointing will give you at-least-once semantics, provided you have sufficient kafka retention. Be aware that checkpoints aren't recoverable if you upgrade code.

Re: Kafka Direct Approach + Zookeeper

2015-05-13 Thread Cody Koeninger
Either one will work, there is no semantic difference. The reason I designed the direct api to accept both of those keys is because they were used to define lists of brokers in pre-existing Kafka project apis. I don't know why the Kafka project chose to use 2 different configuration keys. On

Re: Reading Real Time Data only from Kafka

2015-05-13 Thread Cody Koeninger
, 2015 at 8:10 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Cody, I was just saying that i found more success and high throughput with the low level kafka api prior to KafkfaRDDs which is the future it seems. My apologies if you felt it that way. :) On 12 May 2015 19:47, Cody Koeninger c

Re: Kafka Direct Approach + Zookeeper

2015-05-13 Thread Cody Koeninger
at 3:44 PM, Cody Koeninger c...@koeninger.org wrote: Either one will work, there is no semantic difference. The reason I designed the direct api to accept both of those keys is because they were used to define lists of brokers in pre-existing Kafka project apis. I don't know why the Kafka

Re: kafka + Spark Streaming with checkPointing fails to restart

2015-05-13 Thread Cody Koeninger
http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing shows setting up your stream and calling .checkpoint(checkpointDir) inside the functionToCreateContext. It looks to me like you're setting up your stream and calling checkpoint outside, after getOrCreate. I'm not

<    1   2   3   4   5   6   7   >