Sounds like something's not set up right... can you post a minimal code
example that reproduces the issue?
On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang suchenz...@gmail.com wrote:
Yeah. All messages are lost while the streaming job was down.
On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger c
It doesn't matter if shuffling occurs. Just update ZK from the driver,
inside the foreachRDD, after all your dynamodb updates are done. Since
you're just doing it for monitoring purposes, that should be fine.
On Mon, Aug 24, 2015 at 12:11 PM, suchenzang suchenz...@gmail.com wrote:
Forgot to
I'd start off by trying to simplify that closure - you don't need the
transform step, or currOffsetRanges to be scoped outside of it. Just do
everything in foreachRDD. LIkewise, it looks like zkClient is also scoped
outside of the closure passed to foreachRDD
i.e. you have
zkClient = new
Each spark partition will contain messages only from a single kafka
topcipartition. Use hasOffsetRanges to tell which kafka topicpartition
it's from. See the docs
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
On Sun, Aug 23, 2015 at 10:56 AM, Spark Enthusiast
happen that you are spending
too much time on disk io etc.
On Aug 21, 2015 7:32 AM, Cody Koeninger c...@koeninger.org wrote:
Sounds like that's happening consistently, not an occasional network
problem?
Look at the Kafka broker logs
Make sure you've configured the correct kafka broker
Sounds like that's happening consistently, not an occasional network
problem?
Look at the Kafka broker logs
Make sure you've configured the correct kafka broker hosts / ports (note
that direct stream does not use zookeeper host / port).
Make sure that host / port is reachable from your driver
In general you cannot guarantee which node an RDD will be processed on.
The preferred location for a kafkardd is the kafka leader for that
partition, if they're deployed on the same machine. If you want to try to
override that behavior, the method is getPreferredLocations
But even in that case,
I'm not clear on your question, can you rephrase it? Also, are you talking
about createStream or createDirectStream?
On Thu, Aug 20, 2015 at 9:48 PM, Gaurav Agarwal gaurav130...@gmail.com
wrote:
Hello
Regarding Spark Streaming and Kafka Partitioning
When i send message on kafka topic with
, Shushant Arora
shushantaror...@gmail.com wrote:
But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java
generic inheritance is not supported so derived class cannot return
different genric typed subclass from overriden method.
On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger c
the same error did not come while extending
DirectKafkaInputDStream from InputDStream ? Since new return type
Option[KafkaRDD[K,
V, U, T, R]] is not subclass of Option[RDD[T] so it should have been
failed?
On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger c...@koeninger.org
wrote
[][] but its expecting
scala.Optionorg.apache.spark.rdd.RDDbyte[][] from derived class . Is
there something wring with code?
On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org
wrote:
Look at the definitions of the java-specific
KafkaUtils.createDirectStream methods (the ones
looking forward to it :)
On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger c...@koeninger.org
wrote:
The solution you found is also in the docs:
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
Java uses an atomic reference because Java doesn't allow you to close
over non
The solution you found is also in the docs:
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
Java uses an atomic reference because Java doesn't allow you to close over
non-final references.
I'm not clear on your other question.
On Tue, Aug 18, 2015 at 3:43 AM, Petr Novak
I wouldn't expect a kafka producer to be serializable at all... among other
things, it has a background thread
On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan wansheng...@gmail.com
wrote:
Hi,
Did anyone see java.util.ConcurrentModificationException when using
broadcast variables?
I
instead of Function1 ?
On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger c...@koeninger.org
wrote:
I'm not aware of an existing api per se, but you could create your own
subclass of the DStream that returns None for compute() under certain
conditions.
On Wed, Aug 12, 2015 at 1:03 PM
stopped it cannot
be restarted, but I also haven't found a way to stop the app
programmatically.
The batch duration will probably be around 1-10 seconds. I think this is
small enough to not make it a batch job?
Thanks again
On Thu, Aug 13, 2015 at 10:15 PM, Cody Koeninger c...@koeninger.org
Use your email client to send a message to the mailing list from the email
address you used to subscribe?
The message you just sent reached the list
On Fri, Aug 14, 2015 at 9:36 AM, dutrow dan.dut...@gmail.com wrote:
How do I get beyond the This post has NOT been accepted by the mailing
list
I don't entirely agree with that assessment. Not paying for extra cores to
run receivers was about as important as delivery semantics, as far as
motivations for the api.
As I said in the jira tickets on the topic, if you want to use the direct
api and save offsets to ZK, you can. The right way
You'll resume and re-process the rdd that didnt finish
On Fri, Aug 14, 2015 at 1:31 PM, Dmitry Goldenberg dgoldenberg...@gmail.com
wrote:
Our additional question on checkpointing is basically the logistics of it
--
At which point does the data get written into checkpointing? Is it
written
The current kafka stream implementation assumes the set of topics doesn't
change during operation.
You could either take a crack at writing a subclass that does what you
need; stop/start; or if your batch duration isn't too small, you could run
it as a series of RDDs (using the existing
It's just to limit the maximum number of records a given executor needs to
deal with in a given batch.
Typical usage would be if you're starting a stream from the beginning of a
kafka log, or after a long downtime, and don't want ALL of the messages in
the first batch.
On Thu, Aug 13, 2015 at
Access the offsets using HasOffsetRanges, save them in your datastore,
provide them as the fromOffsets argument when starting the stream.
See https://github.com/koeninger/kafka-exactly-once
On Thu, Aug 13, 2015 at 3:53 PM, Stephen Durfey sjdur...@gmail.com wrote:
When deploying a spark
will just ignore already
processed events by accessing counter of failed task. Is it directly
possible to access accumulator per task basis without writing to hdfs or
hbase.
On Tue, Aug 11, 2015 at 3:15 AM, Cody Koeninger c...@koeninger.org
wrote:
http://spark.apache.org/docs/latest
That looks like it's during recovery from a checkpoint, so it'd be driver
memory not executor memory.
How big is the checkpoint directory that you're trying to restore from?
On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg
dgoldenberg...@gmail.com wrote:
We're getting the below error.
the original checkpointing directory :( Thanks for the clarification
on spark.driver.memory, I'll keep testing (at 2g things seem OK for now).
On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger c...@koeninger.org
wrote:
That looks like it's during recovery from a checkpoint, so it'd be
driver
There's no long-running receiver pushing blocks of messages, so
blockInterval isn't relevant.
Batch interval is what matters.
On Mon, Aug 10, 2015 at 12:52 PM, allonsy luke1...@gmail.com wrote:
Hi everyone,
I recently started using the new Kafka direct approach.
Now, as far as I
http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-of-output-operations
https://www.youtube.com/watch?v=fXnNEq1v3VA
On Mon, Aug 10, 2015 at 4:32 PM, Shushant
tolerance. Why does Spark persist whole RDD's of data? Shouldn't it be
sufficient to just persist the offsets, to know where to resume from?
Thanks.
On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger c...@koeninger.org
wrote:
You need to keep a certain number of rdds around for checkpointing
of these costs?
On Mon, Aug 10, 2015 at 4:27 PM, Cody Koeninger c...@koeninger.org
wrote:
The rdd is indeed defined by mostly just the offsets / topic partitions.
On Mon, Aug 10, 2015 at 3:24 PM, Dmitry Goldenberg
dgoldenberg...@gmail.com wrote:
You need to keep a certain number of rdds
For direct stream questions:
https://github.com/koeninger/kafka-exactly-once
Yes, it is used in production.
For general spark streaming question:
http://spark.apache.org/docs/latest/streaming-programming-guide.html
On Mon, Aug 10, 2015 at 7:51 AM, Mohit Durgapal durgapalmo...@gmail.com
Those jobs will still be created for each valid time, they just may not
have many messages in them
On Mon, Aug 3, 2015 at 11:11 PM, Shushant Arora shushantaror...@gmail.com
wrote:
1.In spark 1.3(Non receiver) - If my batch interval is 1 sec and I don't
set
Have you tried using the console consumer to see if anything is actually
getting published to that topic?
On Tue, Aug 4, 2015 at 11:45 AM, narendra narencs...@gmail.com wrote:
My application takes Twitter4j tweets and publishes those to a topic in
Kafka. Spark Streaming subscribes to that
Just to be clear, did you rebuild your job against spark 1.4.1 as well as
upgrading the cluster?
On Mon, Aug 3, 2015 at 8:36 AM, Netwaver wanglong_...@163.com wrote:
Hi All,
I have a spark streaming + kafka program written by Scala, it
works well on Spark 1.3.1, but after I migrate
Show us the relevant code
On Fri, Jul 31, 2015 at 12:16 PM, Dmitry Goldenberg
dgoldenberg...@gmail.com wrote:
I've instrumented checkpointing per the programming guide and I can tell
that Spark Streaming is creating the checkpoint directories but I'm not
seeing any content being created in
Just so I'm clear, the difference in timing you're talking about is this:
15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
MetricsSpark.scala:67, took 60.391761 s
15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
MetricsSpark.scala:67, took 0.531323 s
Are
You can't use checkpoints across code upgrades. That may or may not change
in the future, but for now that's a limitation of spark checkpoints
(regardless of whether you're using Kafka).
Some options:
- Start up the new job on a different cluster, then kill the old job once
it's caught up to
.
On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz konstt2...@gmail.com
wrote:
I have three topics with one partition each topic. So each jobs run about
one topics.
2015-07-30 16:20 GMT+02:00 Cody Koeninger c...@koeninger.org:
Just so I'm clear, the difference in timing you're talking about
is valid and it has data I can see data in it using another kafka
consumer.
On Jul 30, 2015 7:31 AM, Cody Koeninger c...@koeninger.org wrote:
The last time someone brought this up on the mailing list, the issue
actually was that the topic(s) didn't exist in Kafka at the time the spark
job
The last time someone brought this up on the mailing list, the issue
actually was that the topic(s) didn't exist in Kafka at the time the spark
job was running.
On Wed, Jul 29, 2015 at 6:17 PM, Tathagata Das t...@databricks.com wrote:
There is a known issue that Kafka cannot return leader
@Ashwin you don't need to append the topic to your data if you're using the
direct stream. You can get the topic from the offset range, see
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
(search for offsetRange)
If you're using the receiver based stream, you'll need to
You don't have to use some other package in order to get access to the
offsets.
Shushant, have you read the available documentation at
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md
or watched
Yes, you need to follow the documentation. Configure your stream,
including the transformations made to it, inside the getOrCreate function.
On Tue, Jul 28, 2015 at 3:14 AM, Guillermo Ortiz konstt2...@gmail.com
wrote:
I'm using SparkStreaming and I want to configure checkpoint to manage
24, 2015 at 3:32 PM, Cody Koeninger c...@koeninger.org
wrote:
It's really a question of whether you need access to the
MessageAndMetadata, or just the key / value from the message.
If you just need the key/value, dstream map is fine.
In your case, since you need to be able to control
Use foreachPartition and batch the writes
On Sat, Jul 25, 2015 at 9:14 AM, nib...@free.fr wrote:
Hello,
I am new user of Spark, and need to know what could be the best practice
to do the following scenario :
- Spark Streaming receives XML messages from Kafka
- Spark transforms each message
Well... there are only 2 hard problems in computer science: naming things,
cache invalidation, and off-by-one errors.
The direct stream implementation isn't asking you to commit anything.
It's asking you to provide a starting point for the stream on startup.
Because offset ranges are inclusive
/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L805
After processing read ConsumerRecord, commit expects me to submit not
offset 0 but offset 1...
Kind regards,
Stevo Slavic
On Fri, Jul 24, 2015 at 6:31 PM, Cody Koeninger c...@koeninger.org
wrote:
Well
method (convert each message within this handler) or dstream.foreachRDD.map
(map rdd) or dstream.map.foreachRDD (map dstream) ?
Thank you for your help Cody.
Regards,
Nicolas PHUNG
On Tue, Jul 21, 2015 at 4:53 PM, Cody Koeninger c...@koeninger.org
wrote:
Yeah, I'm referring to that api
Yes, look at KafkaUtils.createRDD
On Wed, Jul 22, 2015 at 11:17 AM, Shushant Arora shushantaror...@gmail.com
wrote:
Thanks !
I am using spark streaming 1.3 , And if some post fails because of any
reason, I will store the offset of that message in another kafka topic. I
want to read these
, Jul 20, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org
wrote:
Yeah, in the function you supply for the messageHandler parameter to
createDirectStream, catch the exception and do whatever makes sense for
your application.
On Mon, Jul 20, 2015 at 11:58 AM, Nicolas Phung nicolas.ph
,
Nicolas PHUNG
On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org
wrote:
Not exactly the same issue, but possibly related:
https://issues.apache.org/jira/browse/KAFKA-1196
On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger c...@koeninger.org
wrote:
Well, working backwards
or KafkaUtils.createDirectStream ?
Regards,
Nicolas PHUNG
On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger c...@koeninger.org
wrote:
I'd try logging the offsets for each message, see where problems start,
then try using the console consumer starting at those offsets and see if
you can reproduce the problem
Well, working backwards down the stack trace...
at java.nio.Buffer.limit(Buffer.java:275)
That exception gets thrown if the limit is negative or greater than
the buffer's capacity
at kafka.message.Message.sliceDelimited(Message.scala:236)
If size had been negative, it would have just returned
Not exactly the same issue, but possibly related:
https://issues.apache.org/jira/browse/KAFKA-1196
On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger c...@koeninger.org wrote:
Well, working backwards down the stack trace...
at java.nio.Buffer.limit(Buffer.java:275)
That exception gets thrown
, but I seem to remember they were around 10-30
minutes long. Even with peaks in volume, Spark managed to keep up very well.
Thanks,
Silvio
From: Cody Koeninger
Date: Wednesday, July 15, 2015 at 5:38 PM
To: algermissen1971
Cc: Tathagata Das, swetha, user
Subject: Re: Sessionization using
I personally would try to avoid updateStateByKey for sessionization when
you have long sessions / a lot of keys, because it's linear on the number
of keys.
On Tue, Jul 14, 2015 at 6:25 PM, Tathagata Das t...@databricks.com wrote:
[Apologies for repost, for those who have seen this response
the same thing with better
scaling characteristics?
Jan
On 15 Jul 2015, at 15:33, Cody Koeninger c...@koeninger.org wrote:
I personally would try to avoid updateStateByKey for sessionization when
you have long sessions / a lot of keys, because it's linear on the number
of keys.
On Tue
?
Essentially, all I need to do is monitor the progress of data consumption
of the Kafka topic.
On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger c...@koeninger.org
wrote:
You can't use different versions of spark in your application vs your
cluster.
For the direct stream, it's not 60 partitions per
the framework does is really desirable.
With repartition, there is shuffle involved, but at least the
computation load spreads across all 100 executors instead of just 30.
On Fri, Jun 19, 2015 at 7:14 PM, Cody Koeninger c...@koeninger.org
wrote:
If that's the case, you're still only using as many
Yeah, I had brought that up a while back, but didn't get agreement on
removing the stub. Seems to be an intermittent problem. You can just add
an exclude:
mergeStrategy in assembly := {
case PathList(org, apache, spark, unused, UnusedStubClass.class)
= MergeStrategy.first
case x =
Regarding your first question, having more partitions than you do executors
usually means you'll have better utilization, because the workload will be
distributed more evenly. There's some degree of per-task overhead, but as
long as you don't have a huge imbalance between number of tasks and
Does the issue only happen when you have no traffic on the topic?
Have you profiled to see what's using heap space?
On Mon, Jul 13, 2015 at 1:05 PM, Apoorva Sareen apoorva.sar...@gmail.com
wrote:
Hi,
I am running spark streaming 1.4.0 on Yarn (Apache distribution 2.6.0)
with java 1.8.0_45
consumer kafka consumer 0.8.0 with kafka cluster
0.8.2 and that works.
On Thu, Jul 9, 2015 at 9:58 PM, Cody Koeninger c...@koeninger.org wrote:
It's the consumer version. Should work with 0.8.2 clusters.
On Thu, Jul 9, 2015 at 11:10 AM, Shushant Arora
shushantaror...@gmail.com wrote:
Does
It's the consumer version. Should work with 0.8.2 clusters.
On Thu, Jul 9, 2015 at 11:10 AM, Shushant Arora shushantaror...@gmail.com
wrote:
Does spark streaming 1.3 requires kafka version 0.8.1.1 and is not
compatible with kafka 0.8.2 ?
As per maven dependency of spark streaming 1.3 with
Use foreachPartition, and allocate whatever the costly resource is once per
partition.
On Mon, Jul 6, 2015 at 6:11 AM, Shushant Arora shushantaror...@gmail.com
wrote:
I have a requirement to write in kafka queue from a spark streaming
application.
I am using spark 1.2 streaming. Since
You shouldn't rely on being able to restart from a checkpoint after
changing code, regardless of whether the change was explicitly related to
serialization.
If you are relying on checkpoints to hold state, specifically which offsets
have been processed, that state will be lost if you can't
:46 PM, Cody Koeninger c...@koeninger.org
wrote:
Read the spark streaming guide ad the kafka integration guide for a
better understanding of how the receiver based stream works.
Capacity planning is specific to your environment and what the job
is actually doing, youll need to determine
=d5UJonrruHklist=PL-x35fyliRwgfhffEpywn4q23ykotgQJ6index=4
] from last Spark Summit 2015). But that approach can give duplicate
records. The direct approach gives exactly-once guarantees, so you should
try it out.
TD
On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger c...@koeninger.org
wrote
the job created for the specific batch, but the subsequent batches
still proceed, isn’t it right ? And question still remains, how to keep
track of those failed batches ?
From: amit assudani aassud...@impetus.com
Date: Friday, June 26, 2015 at 11:21 AM
To: Cody Koeninger c...@koeninger.org
Cc
If you're consistently throwing exceptions and thus failing tasks, once you
reach max failures the whole stream will stop.
It's up to you to either catch those exceptions, or restart your stream
appropriately once it stops.
Keep in mind that if you're relying on checkpoints, and fixing the error
the connectivity issues to persistent store
which gets resolved in a while, but how do I know which all messages failed
and need rework ?
Regards,
Amit
From: Cody Koeninger c...@koeninger.org
Date: Friday, June 26, 2015 at 11:16 AM
To: amit assudani aassud...@impetus.com
Cc: user
The receiver-based kafka createStream in spark 1.2 uses zookeeper to store
offsets. If you want finer-grained control over offsets, you can update
the values in zookeeper yourself before starting the job.
createDirectStream in spark 1.3 is still marked as experimental, and
subject to change.
Make sure you're following the docs regarding setting up a streaming
checkpoint.
Post your code if you can't get it figured out.
On Fri, Jun 26, 2015 at 3:45 PM, Ashish Nigam ashnigamt...@gmail.com
wrote:
I bring up spark streaming job that uses Kafka as input source.
No data to process and
per second.Each event is of
~500bytes. Having 5 receivers with 60 partitions each receiver is
sufficient for spark streaming to consume ?
On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger c...@koeninger.org
javascript:_e(%7B%7D,'cvml','c...@koeninger.org'); wrote:
The receiver-based kafka
The exception $line49 is referring to a line of the spark shell.
Have you tried it from an actual assembled job with spark-submit ?
On Tue, Jun 23, 2015 at 3:48 PM, syepes sye...@gmail.com wrote:
Hello,
I am trying use the new Kafka consumer
KafkaUtils.createDirectStream
but I am
createStream,
the app does deteriorate but over a much longer period, hours vs days.
On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com
wrote:
Yes, please tell us what operation are you using.
TD
On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org
wrote
. Are there enhancements to this
specific API between 1.3 and 1.4 that can substantially change it's
behaviour?
On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org
wrote:
when you say your old version was
k = createStream .
were you manually creating multiple receivers? Because otherwise
, Cody Koeninger c...@koeninger.org
wrote:
If that's the case, you're still only using as many read executors as
there are kafka partitions.
I'd remove the repartition. If you weren't doing any shuffles in the old
job, and are doing a shuffle in the new job, it's not really comparable.
On Fri
...@163.com
*From:* Haopu Wang hw...@qilinsoft.com
*Date:* 2015-06-19 18:47
*To:* Enno Shioji eshi...@gmail.com; Tathagata Das
t...@databricks.com
*CC:* prajod.vettiyat...@wipro.com; Cody Koeninger c...@koeninger.org;
bit1...@163.com; Jordan Pilat jrpi...@gmail.com; Will Briggs
wrbri
:* Haopu Wang hw...@qilinsoft.com
*Date:* 2015-06-19 18:47
*To:* Enno Shioji eshi...@gmail.com; Tathagata Das t...@databricks.com
*CC:* prajod.vettiyat...@wipro.com; Cody Koeninger c...@koeninger.org;
bit1...@163.com; Jordan Pilat jrpi...@gmail.com; Will Briggs
wrbri...@gmail.com; Ashish Soni
Is there any more info you can provide / relevant code?
On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote:
Update on performance of the new API: the new code using the
createDirectStream API ran overnight and when I checked the app state in
the morning, there were massive
That general description is accurate, but not really a specific issue of
the direct steam. It applies to anything consuming from kafka (or, as
Matei already said, any streaming system really). You can't have exactly
once semantics, unless you know something more about how you're storing
results.
there ... but as long as you're making
sure the setup gets called at most once per executor, before the work that
needs it ... should be ok.
On Fri, Jun 12, 2015 at 4:11 PM, algermissen1971 algermissen1...@icloud.com
wrote:
On 12 Jun 2015, at 22:59, Cody Koeninger c...@koeninger.org wrote:
Close
Close. the mapPartitions call doesn't need to do anything at all to the
iter.
mapPartitions { iter =
SomeDb.conn.init
iter
}
On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 algermissen1...@icloud.com
wrote:
Cody,
On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote
of whether you're using the
createStream or createDirectStream api.
On Fri, Jun 12, 2015 at 9:14 AM, Cody Koeninger c...@koeninger.org wrote:
Casting to HasOffsetRanges would be meaningless anyway if done after an
operation that changes partitioning.
You can still use the messageHandler argument
There are several database apis that use a thread local or singleton
reference to a connection pool (we use ScalikeJDBC currently, but there are
others).
You can use mapPartitions earlier in the chain to make sure the connection
pool is set up on that executor, then use it inside updateStateByKey
The scala api has 2 ways of calling createDirectStream. One of them allows
you to pass a message handler that gets full access to the kafka
MessageAndMetadata, including offset.
I don't know why the python api was developed with only one way to call
createDirectStream, but the first thing I'd
What is the code used to set up the kafka stream?
On Sat, Jun 6, 2015 at 3:23 PM, EH eas...@gmail.com wrote:
And here is the Thread Dump, where seems every worker is waiting for
Executor
#6 Thread 95: sparkExecutor-akka.actor.default-dispatcher-22 (RUNNABLE) to
be complete:
Thread 41:
direct stream isn't a receiver, it isn't required to cache data anywhere
unless you want it to.
If you want it, just call cache.
On Thu, Jun 4, 2015 at 8:20 AM, Dmitry Goldenberg dgoldenberg...@gmail.com
wrote:
set the storage policy for the DStream RDDs to MEMORY AND DISK - it
appears the
The default of 0 means no limit. Each batch will grab as much as is
available, ie a range of offsets spanning from the end of the previous
batch to the highest available offsets on the leader.
If you set spark.streaming.kafka.maxRatePerPartition 0, the number you
set is the maximum number of
[HasOffsetRanges].offsetRanges)
new ProxyRDDWithPartitioner(rdd, part)
}
...
But how can I create same partitioner during updateStateByKey call? I
have no idea how to access rdd when calling updateStateByKey.
вт, 2 июня 2015 г. в 19:15, Cody Koeninger c...@koeninger.org:
I think
KafkaCluster.scala in the spark/extrernal/kafka project has a bunch of api
code, including code for updating Kafka-managed ZK offsets. Look at
setConsumerOffsets.
Unfortunately all of that code is private, but you can either write your
own, copy it, or do what I do (sed out private[spark] and
It's being added in 1.4
https://repository.apache.org/content/repositories/orgapachespark-1104/org/apache/spark/spark-streaming-kafka_2.11/1.4.0-rc2/
On Tue, May 26, 2015 at 3:14 AM, Petr Novak oss.mli...@gmail.com wrote:
Hello,
I would like to switch from Scala 2.10 to 2.11 for Spark app
I just verified that the following code works on 1.3.0 :
val stream1 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topic1)
val stream2 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topic2)
the message in case of failures?
Thanks
Best Regards
On Wed, May 13, 2015 at 8:32 PM, Cody Koeninger c...@koeninger.org
wrote:
You linked to a google mail tab, not a public archive, so I don't know
exactly which conversation you're referring to.
As far as I know, streaming only runs a single
PM, Bill Jay bill.jaypeter...@gmail.com
wrote:
If a Spark streaming job stops at 12:01 and I resume the job at 12:02.
Will it still start to consume the data that were produced to Kafka at
12:01? Or it will just start consuming from the current time?
On Tue, May 19, 2015 at 10:58 AM, Cody
Have you read
https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md ?
1. There's nothing preventing that.
2. Checkpointing will give you at-least-once semantics, provided you have
sufficient kafka retention. Be aware that checkpoints aren't recoverable
if you upgrade code.
Either one will work, there is no semantic difference.
The reason I designed the direct api to accept both of those keys is
because they were used to define lists of brokers in pre-existing Kafka
project apis. I don't know why the Kafka project chose to use 2 different
configuration keys.
On
, 2015 at 8:10 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:
Hi Cody,
I was just saying that i found more success and high throughput with the
low level kafka api prior to KafkfaRDDs which is the future it seems. My
apologies if you felt it that way. :)
On 12 May 2015 19:47, Cody Koeninger c
at 3:44 PM, Cody Koeninger c...@koeninger.org
wrote:
Either one will work, there is no semantic difference.
The reason I designed the direct api to accept both of those keys is
because they were used to define lists of brokers in pre-existing Kafka
project apis. I don't know why the Kafka
http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
shows setting up your stream and calling .checkpoint(checkpointDir) inside
the functionToCreateContext. It looks to me like you're setting up your
stream and calling checkpoint outside, after getOrCreate.
I'm not
501 - 600 of 652 matches
Mail list logo